1 |
/* |
---|
2 |
* Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc. |
---|
3 |
* All rights reserved. |
---|
4 |
* |
---|
5 |
* Redistribution and use in source and binary forms, with or without |
---|
6 |
* modification, are permitted provided that the following conditions are |
---|
7 |
* met: |
---|
8 |
* |
---|
9 |
* * Redistributions of source code must retain the above copyright |
---|
10 |
* notice, this list of conditions and the following disclaimer. |
---|
11 |
* * Redistributions in binary form must reproduce the above |
---|
12 |
* copyright notice, this list of conditions and the following |
---|
13 |
* disclaimer in the documentation and/or other materials provided |
---|
14 |
* with the distribution. |
---|
15 |
* * Neither the name OmniTI Computer Consulting, Inc. nor the names |
---|
16 |
* of its contributors may be used to endorse or promote products |
---|
17 |
* derived from this software without specific prior written |
---|
18 |
* permission. |
---|
19 |
* |
---|
20 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
---|
21 |
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
---|
22 |
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
---|
23 |
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
---|
24 |
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
---|
25 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
---|
26 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
---|
27 |
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
---|
28 |
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
---|
29 |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
---|
30 |
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
31 |
*/ |
---|
32 |
|
---|
33 |
#include "noit_defines.h" |
---|
34 |
#include "noit_module.h" |
---|
35 |
#include "eventer/eventer.h" |
---|
36 |
#include "utils/noit_log.h" |
---|
37 |
#include "utils/noit_b64.h" |
---|
38 |
#include "utils/noit_str.h" |
---|
39 |
#include "utils/noit_mkdir.h" |
---|
40 |
#include "utils/noit_getip.h" |
---|
41 |
#include "utils/noit_watchdog.h" |
---|
42 |
#include "stratcon_datastore.h" |
---|
43 |
#include "stratcon_realtime_http.h" |
---|
44 |
#include "stratcon_iep.h" |
---|
45 |
#include "noit_conf.h" |
---|
46 |
#include "noit_check.h" |
---|
47 |
#include "noit_check_log_helpers.h" |
---|
48 |
#include "noit_rest.h" |
---|
49 |
#include <unistd.h> |
---|
50 |
#include <fcntl.h> |
---|
51 |
#include <netinet/in.h> |
---|
52 |
#include <sys/un.h> |
---|
53 |
#include <dirent.h> |
---|
54 |
#include <arpa/inet.h> |
---|
55 |
#include <sys/mman.h> |
---|
56 |
#include <libpq-fe.h> |
---|
57 |
#include <zlib.h> |
---|
58 |
#include <assert.h> |
---|
59 |
#include <errno.h> |
---|
60 |
#include "postgres_ingestor.xmlh" |
---|
61 |
#include "bundle.pb-c.h" |
---|
62 |
|
---|
63 |
#define DECL_STMT(codename,confname) \ |
---|
64 |
static char *codename = NULL; \ |
---|
65 |
static const char *codename##_conf = "/stratcon/database/statements/" #confname |
---|
66 |
|
---|
67 |
DECL_STMT(storage_post_connect, storagepostconnect); |
---|
68 |
DECL_STMT(metanode_post_connect, metanodepostconnect); |
---|
69 |
DECL_STMT(find_storage, findstoragenode); |
---|
70 |
DECL_STMT(all_storage, allstoragenodes); |
---|
71 |
DECL_STMT(check_map, mapchecktostoragenode); |
---|
72 |
DECL_STMT(check_mapall, mapallchecks); |
---|
73 |
DECL_STMT(check_loadall, allchecks); |
---|
74 |
DECL_STMT(check_find, findcheck); |
---|
75 |
DECL_STMT(check_insert, check); |
---|
76 |
DECL_STMT(status_insert, status); |
---|
77 |
DECL_STMT(metric_insert_numeric, metric_numeric); |
---|
78 |
DECL_STMT(metric_insert_text, metric_text); |
---|
79 |
DECL_STMT(config_insert, config); |
---|
80 |
DECL_STMT(config_get, findconfig); |
---|
81 |
|
---|
82 |
static noit_log_stream_t ds_err = NULL; |
---|
83 |
static noit_log_stream_t ds_deb = NULL; |
---|
84 |
static noit_log_stream_t ds_pool_deb = NULL; |
---|
85 |
static noit_log_stream_t ingest_err = NULL; |
---|
86 |
|
---|
87 |
#define GET_QUERY(a) do { \ |
---|
88 |
if(a == NULL) \ |
---|
89 |
if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \ |
---|
90 |
goto bad_row; \ |
---|
91 |
} while(0) |
---|
92 |
|
---|
93 |
struct conn_q; |
---|
94 |
|
---|
95 |
typedef struct { |
---|
96 |
char *queue_name; /* the key fqdn+remote_sn */ |
---|
97 |
eventer_jobq_t *jobq; |
---|
98 |
struct conn_q *head; |
---|
99 |
pthread_mutex_t lock; |
---|
100 |
pthread_cond_t cv; |
---|
101 |
int ttl; |
---|
102 |
int in_pool; |
---|
103 |
int outstanding; |
---|
104 |
int max_allocated; |
---|
105 |
int max_in_pool; |
---|
106 |
} conn_pool; |
---|
107 |
typedef struct conn_q { |
---|
108 |
time_t last_use; |
---|
109 |
char *dsn; /* Pg connect string */ |
---|
110 |
char *remote_str; /* the IP of the noit*/ |
---|
111 |
char *remote_cn; /* the Cert CN of the noit */ |
---|
112 |
char *fqdn; /* the fqdn of the storage node */ |
---|
113 |
conn_pool *pool; |
---|
114 |
struct conn_q *next; |
---|
115 |
/* Postgres specific stuff */ |
---|
116 |
PGconn *dbh; |
---|
117 |
} conn_q; |
---|
118 |
|
---|
119 |
|
---|
120 |
#define MAX_PARAMS 8 |
---|
121 |
#define POSTGRES_PARTS \ |
---|
122 |
PGresult *res; \ |
---|
123 |
int rv; \ |
---|
124 |
time_t whence; \ |
---|
125 |
int nparams; \ |
---|
126 |
int metric_type; \ |
---|
127 |
char *paramValues[MAX_PARAMS]; \ |
---|
128 |
int paramLengths[MAX_PARAMS]; \ |
---|
129 |
int paramFormats[MAX_PARAMS]; \ |
---|
130 |
int paramAllocd[MAX_PARAMS]; |
---|
131 |
|
---|
132 |
typedef struct ds_single_detail { |
---|
133 |
POSTGRES_PARTS |
---|
134 |
} ds_single_detail; |
---|
135 |
typedef struct { |
---|
136 |
/* Postgres specific stuff */ |
---|
137 |
POSTGRES_PARTS |
---|
138 |
struct realtime_tracker *rt; |
---|
139 |
conn_q *cq; /* connection on which to perform this job */ |
---|
140 |
eventer_t completion_event; /* This event should be registered if non NULL */ |
---|
141 |
} ds_rt_detail; |
---|
142 |
typedef struct ds_line_detail { |
---|
143 |
/* Postgres specific stuff */ |
---|
144 |
POSTGRES_PARTS |
---|
145 |
char *data; |
---|
146 |
int problematic; |
---|
147 |
struct ds_line_detail *next; |
---|
148 |
} ds_line_detail; |
---|
149 |
|
---|
150 |
typedef struct { |
---|
151 |
char *remote_str; |
---|
152 |
char *remote_cn; |
---|
153 |
char *fqdn; |
---|
154 |
int storagenode_id; |
---|
155 |
int fd; |
---|
156 |
char *filename; |
---|
157 |
conn_pool *cpool; |
---|
158 |
} pg_interim_journal_t; |
---|
159 |
|
---|
160 |
static int stratcon_database_connect(conn_q *cq); |
---|
161 |
static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn); |
---|
162 |
static int storage_node_quick_lookup(const char *uuid_str, |
---|
163 |
const char *remote_cn, |
---|
164 |
int *sid_out, int *storagenode_id_out, |
---|
165 |
const char **remote_cn_out, |
---|
166 |
const char **fqdn_out, |
---|
167 |
const char **dsn_out); |
---|
168 |
|
---|
169 |
static void |
---|
170 |
free_params(ds_single_detail *d) { |
---|
171 |
int i; |
---|
172 |
for(i=0; i<d->nparams; i++) |
---|
173 |
if(d->paramAllocd[i] && d->paramValues[i]) |
---|
174 |
free(d->paramValues[i]); |
---|
175 |
} |
---|
176 |
|
---|
177 |
static char *basejpath = NULL; |
---|
178 |
static pthread_mutex_t ds_conns_lock; |
---|
179 |
static noit_hash_table ds_conns; |
---|
180 |
|
---|
181 |
/* the fqdn cache needs to be thread safe */ |
---|
182 |
typedef struct { |
---|
183 |
char *uuid_str; |
---|
184 |
char *remote_cn; |
---|
185 |
int storagenode_id; |
---|
186 |
int sid; |
---|
187 |
} uuid_info; |
---|
188 |
typedef struct { |
---|
189 |
int storagenode_id; |
---|
190 |
char *fqdn; |
---|
191 |
char *dsn; |
---|
192 |
} storagenode_info; |
---|
193 |
noit_hash_table uuid_to_info_cache; |
---|
194 |
pthread_mutex_t storagenode_to_info_cache_lock; |
---|
195 |
noit_hash_table storagenode_to_info_cache; |
---|
196 |
|
---|
197 |
/* Thread-safe connection pools */ |
---|
198 |
|
---|
199 |
/* Forcefree -> 1 prevents it from going to the pool and it gets freed */ |
---|
200 |
static void |
---|
201 |
release_conn_q_forceable(conn_q *cq, int forcefree) { |
---|
202 |
int putback = 0; |
---|
203 |
cq->last_use = time(NULL); |
---|
204 |
pthread_mutex_lock(&cq->pool->lock); |
---|
205 |
cq->pool->outstanding--; |
---|
206 |
if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) { |
---|
207 |
putback = 1; |
---|
208 |
cq->next = cq->pool->head; |
---|
209 |
cq->pool->head = cq; |
---|
210 |
cq->pool->in_pool++; |
---|
211 |
} |
---|
212 |
pthread_mutex_unlock(&cq->pool->lock); |
---|
213 |
noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(), |
---|
214 |
putback ? "to pool" : "and destroy", cq->pool->queue_name); |
---|
215 |
pthread_cond_signal(&cq->pool->cv); |
---|
216 |
if(putback) return; |
---|
217 |
|
---|
218 |
/* Not put back, release it */ |
---|
219 |
if(cq->dbh) PQfinish(cq->dbh); |
---|
220 |
if(cq->remote_str) free(cq->remote_str); |
---|
221 |
if(cq->remote_cn) free(cq->remote_cn); |
---|
222 |
if(cq->fqdn) free(cq->fqdn); |
---|
223 |
if(cq->dsn) free(cq->dsn); |
---|
224 |
free(cq); |
---|
225 |
} |
---|
226 |
static void |
---|
227 |
ttl_purge_conn_pool(conn_pool *pool) { |
---|
228 |
int old_cnt, new_cnt; |
---|
229 |
time_t now = time(NULL); |
---|
230 |
conn_q *cq, *prev = NULL, *iter; |
---|
231 |
/* because we always replace on the head and update the last_use time when |
---|
232 |
doing so, we know they are ordered LRU on the end. So, once we hit an |
---|
233 |
old one, we know all the others are old too. |
---|
234 |
*/ |
---|
235 |
if(!pool->head) return; /* hack short circuit for no locks */ |
---|
236 |
pthread_mutex_lock(&pool->lock); |
---|
237 |
old_cnt = pool->in_pool; |
---|
238 |
cq = pool->head; |
---|
239 |
while(cq) { |
---|
240 |
if(cq->last_use + cq->pool->ttl < now) { |
---|
241 |
if(prev) prev->next = NULL; |
---|
242 |
else pool->head = NULL; |
---|
243 |
break; |
---|
244 |
} |
---|
245 |
prev = cq; |
---|
246 |
cq = cq->next; |
---|
247 |
} |
---|
248 |
/* Now pool->head is a chain of unexpired and cq is a chain of expired */ |
---|
249 |
/* Fix accounting */ |
---|
250 |
for(iter=cq; iter; iter=iter->next) pool->in_pool--; |
---|
251 |
new_cnt = pool->in_pool; |
---|
252 |
pthread_mutex_unlock(&pool->lock); |
---|
253 |
|
---|
254 |
/* Force release these without holding the lock */ |
---|
255 |
while(cq) { |
---|
256 |
cq = cq->next; |
---|
257 |
release_conn_q_forceable(cq, 1); |
---|
258 |
} |
---|
259 |
if(old_cnt != new_cnt) |
---|
260 |
noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt, |
---|
261 |
pool->queue_name); |
---|
262 |
} |
---|
263 |
static void |
---|
264 |
release_conn_q(conn_q *cq) { |
---|
265 |
ttl_purge_conn_pool(cq->pool); |
---|
266 |
release_conn_q_forceable(cq, 0); |
---|
267 |
} |
---|
268 |
static conn_pool * |
---|
269 |
get_conn_pool_for_remote(const char *remote_str, |
---|
270 |
const char *remote_cn, const char *fqdn) { |
---|
271 |
void *vcpool; |
---|
272 |
conn_pool *cpool = NULL; |
---|
273 |
char queue_name[256] = "datastore_"; |
---|
274 |
snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s", |
---|
275 |
(remote_str && *remote_str) ? remote_str : "0.0.0.0", |
---|
276 |
fqdn ? fqdn : "default", |
---|
277 |
remote_cn ? remote_cn : "default"); |
---|
278 |
pthread_mutex_lock(&ds_conns_lock); |
---|
279 |
if(noit_hash_retrieve(&ds_conns, (const char *)queue_name, |
---|
280 |
strlen(queue_name), &vcpool)) |
---|
281 |
cpool = vcpool; |
---|
282 |
pthread_mutex_unlock(&ds_conns_lock); |
---|
283 |
if(!cpool) { |
---|
284 |
vcpool = cpool = calloc(1, sizeof(*cpool)); |
---|
285 |
cpool->queue_name = strdup(queue_name); |
---|
286 |
pthread_mutex_init(&cpool->lock, NULL); |
---|
287 |
pthread_cond_init(&cpool->cv, NULL); |
---|
288 |
cpool->in_pool = 0; |
---|
289 |
cpool->outstanding = 0; |
---|
290 |
cpool->max_in_pool = 1; |
---|
291 |
cpool->max_allocated = 1; |
---|
292 |
pthread_mutex_lock(&ds_conns_lock); |
---|
293 |
if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name), |
---|
294 |
cpool)) { |
---|
295 |
noit_hash_retrieve(&ds_conns, (const char *)queue_name, |
---|
296 |
strlen(queue_name), &vcpool); |
---|
297 |
} |
---|
298 |
pthread_mutex_unlock(&ds_conns_lock); |
---|
299 |
if(vcpool != cpool) { |
---|
300 |
/* someone beat us to it */ |
---|
301 |
free(cpool->queue_name); |
---|
302 |
pthread_mutex_destroy(&cpool->lock); |
---|
303 |
pthread_cond_destroy(&cpool->cv); |
---|
304 |
free(cpool); |
---|
305 |
} |
---|
306 |
else { |
---|
307 |
int i; |
---|
308 |
/* Our job to setup the pool */ |
---|
309 |
cpool->jobq = calloc(1, sizeof(*cpool->jobq)); |
---|
310 |
eventer_jobq_init(cpool->jobq, queue_name); |
---|
311 |
cpool->jobq->backq = eventer_default_backq(); |
---|
312 |
/* Add one thread */ |
---|
313 |
for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++) |
---|
314 |
eventer_jobq_increase_concurrency(cpool->jobq); |
---|
315 |
} |
---|
316 |
cpool = vcpool; |
---|
317 |
} |
---|
318 |
return cpool; |
---|
319 |
} |
---|
320 |
static conn_q * |
---|
321 |
get_conn_q_for_remote(const char *remote_str, |
---|
322 |
const char *remote_cn, const char *fqdn, |
---|
323 |
const char *dsn) { |
---|
324 |
conn_pool *cpool; |
---|
325 |
conn_q *cq; |
---|
326 |
cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn); |
---|
327 |
noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(), |
---|
328 |
cpool->queue_name); |
---|
329 |
pthread_mutex_lock(&cpool->lock); |
---|
330 |
again: |
---|
331 |
if(cpool->head) { |
---|
332 |
assert(cpool->in_pool > 0); |
---|
333 |
cq = cpool->head; |
---|
334 |
cpool->head = cq->next; |
---|
335 |
cpool->in_pool--; |
---|
336 |
cpool->outstanding++; |
---|
337 |
cq->next = NULL; |
---|
338 |
pthread_mutex_unlock(&cpool->lock); |
---|
339 |
return cq; |
---|
340 |
} |
---|
341 |
if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) { |
---|
342 |
noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n", |
---|
343 |
(void *)pthread_self(), cpool->queue_name); |
---|
344 |
pthread_cond_wait(&cpool->cv, &cpool->lock); |
---|
345 |
noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n", |
---|
346 |
(void *)pthread_self(), cpool->queue_name); |
---|
347 |
goto again; |
---|
348 |
} |
---|
349 |
else { |
---|
350 |
cpool->outstanding++; |
---|
351 |
pthread_mutex_unlock(&cpool->lock); |
---|
352 |
} |
---|
353 |
|
---|
354 |
cq = calloc(1, sizeof(*cq)); |
---|
355 |
cq->pool = cpool; |
---|
356 |
cq->remote_str = remote_str ? strdup(remote_str) : NULL; |
---|
357 |
cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL; |
---|
358 |
cq->fqdn = fqdn ? strdup(fqdn) : NULL; |
---|
359 |
cq->dsn = dsn ? strdup(dsn) : NULL; |
---|
360 |
return cq; |
---|
361 |
} |
---|
362 |
static conn_q * |
---|
363 |
get_conn_q_for_metanode() { |
---|
364 |
return get_conn_q_for_remote(NULL,NULL,NULL,NULL); |
---|
365 |
} |
---|
366 |
|
---|
367 |
typedef enum { |
---|
368 |
DS_EXEC_SUCCESS = 0, |
---|
369 |
DS_EXEC_ROW_FAILED = 1, |
---|
370 |
DS_EXEC_TXN_FAILED = 2, |
---|
371 |
} execute_outcome_t; |
---|
372 |
|
---|
373 |
#define DECLARE_PARAM_STR(str, len) do { \ |
---|
374 |
d->paramValues[d->nparams] = noit__strndup(str, len); \ |
---|
375 |
d->paramLengths[d->nparams] = len; \ |
---|
376 |
d->paramFormats[d->nparams] = 0; \ |
---|
377 |
d->paramAllocd[d->nparams] = 1; \ |
---|
378 |
if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \ |
---|
379 |
free(d->paramValues[d->nparams]); \ |
---|
380 |
d->paramValues[d->nparams] = NULL; \ |
---|
381 |
d->paramLengths[d->nparams] = 0; \ |
---|
382 |
d->paramAllocd[d->nparams] = 0; \ |
---|
383 |
} \ |
---|
384 |
d->nparams++; \ |
---|
385 |
} while(0) |
---|
386 |
#define DECLARE_PARAM_INT(i) do { \ |
---|
387 |
int buffer__len; \ |
---|
388 |
char buffer__[32]; \ |
---|
389 |
snprintf(buffer__, sizeof(buffer__), "%d", (i)); \ |
---|
390 |
buffer__len = strlen(buffer__); \ |
---|
391 |
DECLARE_PARAM_STR(buffer__, buffer__len); \ |
---|
392 |
} while(0) |
---|
393 |
|
---|
394 |
#define PG_GET_STR_COL(dest, row, name) do { \ |
---|
395 |
int colnum = PQfnumber(d->res, name); \ |
---|
396 |
dest = NULL; \ |
---|
397 |
if (colnum >= 0) \ |
---|
398 |
dest = PQgetisnull(d->res, row, colnum) \ |
---|
399 |
? NULL : PQgetvalue(d->res, row, colnum); \ |
---|
400 |
} while(0) |
---|
401 |
|
---|
402 |
#define PG_EXEC(cmd) do { \ |
---|
403 |
d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \ |
---|
404 |
(const char * const *)d->paramValues, \ |
---|
405 |
d->paramLengths, d->paramFormats, 0); \ |
---|
406 |
d->rv = PQresultStatus(d->res); \ |
---|
407 |
if(d->rv != PGRES_COMMAND_OK && \ |
---|
408 |
d->rv != PGRES_TUPLES_OK) { \ |
---|
409 |
const char *pgerr = PQresultErrorMessage(d->res); \ |
---|
410 |
const char *pgerr_end = strchr(pgerr, '\n'); \ |
---|
411 |
if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \ |
---|
412 |
noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \ |
---|
413 |
cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \ |
---|
414 |
(int)(pgerr_end - pgerr), pgerr); \ |
---|
415 |
PQclear(d->res); \ |
---|
416 |
goto bad_row; \ |
---|
417 |
} \ |
---|
418 |
} while(0) |
---|
419 |
|
---|
420 |
#define PG_TM_EXEC(cmd, whence) do { \ |
---|
421 |
time_t __w = whence; \ |
---|
422 |
char cmdbuf[4096]; \ |
---|
423 |
struct tm tbuf, *tm; \ |
---|
424 |
tm = gmtime_r(&__w, &tbuf); \ |
---|
425 |
strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \ |
---|
426 |
d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \ |
---|
427 |
(const char * const *)d->paramValues, \ |
---|
428 |
d->paramLengths, d->paramFormats, 0); \ |
---|
429 |
d->rv = PQresultStatus(d->res); \ |
---|
430 |
if(d->rv != PGRES_COMMAND_OK && \ |
---|
431 |
d->rv != PGRES_TUPLES_OK) { \ |
---|
432 |
const char *pgerr = PQresultErrorMessage(d->res); \ |
---|
433 |
const char *pgerr_end = strchr(pgerr, '\n'); \ |
---|
434 |
if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \ |
---|
435 |
noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \ |
---|
436 |
__LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \ |
---|
437 |
(long long unsigned)whence); \ |
---|
438 |
PQclear(d->res); \ |
---|
439 |
goto bad_row; \ |
---|
440 |
} \ |
---|
441 |
} while(0) |
---|
442 |
|
---|
443 |
static void * |
---|
444 |
stratcon_ingest_check_loadall(void *vsn) { |
---|
445 |
storagenode_info *sn = vsn; |
---|
446 |
ds_single_detail *d; |
---|
447 |
int i, row_count = 0, good = 0; |
---|
448 |
char buff[1024]; |
---|
449 |
conn_q *cq = NULL; |
---|
450 |
|
---|
451 |
d = calloc(1, sizeof(*d)); |
---|
452 |
GET_QUERY(check_loadall); |
---|
453 |
cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn); |
---|
454 |
i = 0; |
---|
455 |
while(stratcon_database_connect(cq)) { |
---|
456 |
if(i++ > 4) { |
---|
457 |
noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn); |
---|
458 |
release_conn_q(cq); |
---|
459 |
return (void *)(vpsized_int)good; |
---|
460 |
} |
---|
461 |
sleep(1); |
---|
462 |
} |
---|
463 |
PG_EXEC(check_loadall); |
---|
464 |
row_count = PQntuples(d->res); |
---|
465 |
|
---|
466 |
for(i=0; i<row_count; i++) { |
---|
467 |
int rv; |
---|
468 |
int8_t family; |
---|
469 |
struct sockaddr *sin; |
---|
470 |
struct sockaddr_in sin4 = { .sin_family = AF_INET }; |
---|
471 |
struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 }; |
---|
472 |
char *remote, *id, *target, *module, *name; |
---|
473 |
PG_GET_STR_COL(remote, i, "remote_address"); |
---|
474 |
PG_GET_STR_COL(id, i, "id"); |
---|
475 |
PG_GET_STR_COL(target, i, "target"); |
---|
476 |
PG_GET_STR_COL(module, i, "module"); |
---|
477 |
PG_GET_STR_COL(name, i, "name"); |
---|
478 |
snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name); |
---|
479 |
|
---|
480 |
family = AF_INET; |
---|
481 |
sin = (struct sockaddr *)&sin4; |
---|
482 |
rv = inet_pton(family, remote, &sin4.sin_addr); |
---|
483 |
if(rv != 1) { |
---|
484 |
family = AF_INET6; |
---|
485 |
sin = (struct sockaddr *)&sin6; |
---|
486 |
rv = inet_pton(family, remote, &sin6.sin6_addr); |
---|
487 |
if(rv != 1) { |
---|
488 |
noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote); |
---|
489 |
sin = NULL; |
---|
490 |
} |
---|
491 |
} |
---|
492 |
|
---|
493 |
/* stratcon_iep_line_processor takes an allocated operand and frees it */ |
---|
494 |
stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL); |
---|
495 |
good++; |
---|
496 |
} |
---|
497 |
noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n", |
---|
498 |
good, row_count, sn->fqdn); |
---|
499 |
bad_row: |
---|
500 |
free_params((ds_single_detail *)d); |
---|
501 |
free(d); |
---|
502 |
if(cq) release_conn_q(cq); |
---|
503 |
return (void *)(vpsized_int)good; |
---|
504 |
} |
---|
505 |
static int |
---|
506 |
stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure, |
---|
507 |
struct timeval *now) { |
---|
508 |
storagenode_info self = { 0, NULL, NULL }, **sns = NULL; |
---|
509 |
pthread_t *jobs = NULL; |
---|
510 |
int nodes, i = 0, tcnt = 0; |
---|
511 |
if(!(mask & EVENTER_ASYNCH_WORK)) return 0; |
---|
512 |
if(mask & EVENTER_ASYNCH_CLEANUP) return 0; |
---|
513 |
|
---|
514 |
pthread_mutex_lock(&storagenode_to_info_cache_lock); |
---|
515 |
nodes = storagenode_to_info_cache.size; |
---|
516 |
jobs = calloc(MAX(1,nodes), sizeof(*jobs)); |
---|
517 |
sns = calloc(MAX(1,nodes), sizeof(*sns)); |
---|
518 |
if(nodes == 0) sns[nodes++] = &self; |
---|
519 |
else { |
---|
520 |
noit_hash_iter iter = NOIT_HASH_ITER_ZERO; |
---|
521 |
const char *k; |
---|
522 |
void *v; |
---|
523 |
int klen; |
---|
524 |
while(noit_hash_next(&storagenode_to_info_cache, |
---|
525 |
&iter, &k, &klen, &v)) { |
---|
526 |
sns[i++] = (storagenode_info *)v; |
---|
527 |
} |
---|
528 |
} |
---|
529 |
pthread_mutex_unlock(&storagenode_to_info_cache_lock); |
---|
530 |
|
---|
531 |
for(i=0; i<nodes; i++) { |
---|
532 |
if(pthread_create(&jobs[i], NULL, |
---|
533 |
stratcon_ingest_check_loadall, sns[i]) != 0) { |
---|
534 |
noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno)); |
---|
535 |
} |
---|
536 |
} |
---|
537 |
for(i=0; i<nodes; i++) { |
---|
538 |
void *good; |
---|
539 |
pthread_join(jobs[i], &good); |
---|
540 |
tcnt += (int)(vpsized_int)good; |
---|
541 |
} |
---|
542 |
free(jobs); |
---|
543 |
free(sns); |
---|
544 |
noitL(noit_error, "Loaded all %d check states.\n", tcnt); |
---|
545 |
return 0; |
---|
546 |
} |
---|
547 |
static void |
---|
548 |
stratcon_ingest_iep_check_preload() { |
---|
549 |
eventer_t e; |
---|
550 |
conn_pool *cpool; |
---|
551 |
|
---|
552 |
cpool = get_conn_pool_for_remote(NULL,NULL,NULL); |
---|
553 |
e = eventer_alloc(); |
---|
554 |
e->mask = EVENTER_ASYNCH; |
---|
555 |
e->callback = stratcon_ingest_asynch_drive_iep; |
---|
556 |
e->closure = NULL; |
---|
557 |
eventer_add_asynch(cpool->jobq, e); |
---|
558 |
} |
---|
559 |
execute_outcome_t |
---|
560 |
stratcon_ingest_find(ds_rt_detail *d) { |
---|
561 |
conn_q *cq; |
---|
562 |
char *val; |
---|
563 |
int row_count; |
---|
564 |
struct realtime_tracker *node; |
---|
565 |
|
---|
566 |
for(node = d->rt; node; node = node->next) { |
---|
567 |
char uuid_str[UUID_STR_LEN+1]; |
---|
568 |
const char *fqdn, *dsn, *remote_cn; |
---|
569 |
char remote_ip[32]; |
---|
570 |
int storagenode_id; |
---|
571 |
|
---|
572 |
uuid_unparse_lower(node->checkid, uuid_str); |
---|
573 |
if(storage_node_quick_lookup(uuid_str, NULL, &node->sid, |
---|
574 |
&storagenode_id, &remote_cn, &fqdn, &dsn)) |
---|
575 |
continue; |
---|
576 |
|
---|
577 |
noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n", |
---|
578 |
node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)"); |
---|
579 |
|
---|
580 |
/* We might be able to find the IP from our config if someone has |
---|
581 |
* specified the expected cn in the noit definition. |
---|
582 |
*/ |
---|
583 |
if(stratcon_find_noit_ip_by_cn(remote_cn, |
---|
584 |
remote_ip, sizeof(remote_ip)) == 0) { |
---|
585 |
node->noit = strdup(remote_ip); |
---|
586 |
noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit); |
---|
587 |
continue; |
---|
588 |
} |
---|
589 |
|
---|
590 |
cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn); |
---|
591 |
stratcon_database_connect(cq); |
---|
592 |
|
---|
593 |
GET_QUERY(check_find); |
---|
594 |
DECLARE_PARAM_INT(node->sid); |
---|
595 |
PG_EXEC(check_find); |
---|
596 |
row_count = PQntuples(d->res); |
---|
597 |
if(row_count != 1) { |
---|
598 |
noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid); |
---|
599 |
PQclear(d->res); |
---|
600 |
goto bad_row; |
---|
601 |
} |
---|
602 |
|
---|
603 |
/* Get the remote_address (which noit owns this) */ |
---|
604 |
PG_GET_STR_COL(val, 0, "remote_address"); |
---|
605 |
if(!val) { |
---|
606 |
noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn); |
---|
607 |
PQclear(d->res); |
---|
608 |
goto bad_row; |
---|
609 |
} |
---|
610 |
node->noit = strdup(val); |
---|
611 |
noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit); |
---|
612 |
bad_row: |
---|
613 |
free_params((ds_single_detail *)d); |
---|
614 |
d->nparams = 0; |
---|
615 |
release_conn_q(cq); |
---|
616 |
} |
---|
617 |
return DS_EXEC_SUCCESS; |
---|
618 |
} |
---|
619 |
execute_outcome_t |
---|
620 |
stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn, |
---|
621 |
ds_line_detail *d) { |
---|
622 |
int type, len, sid; |
---|
623 |
char *final_buff; |
---|
624 |
uLong final_len, actual_final_len; |
---|
625 |
char *token; |
---|
626 |
char raddr_blank[1] = ""; |
---|
627 |
const char *raddr; |
---|
628 |
|
---|
629 |
type = d->data[0]; |
---|
630 |
raddr = r ? r : raddr_blank; |
---|
631 |
|
---|
632 |
/* Parse the log line, but only if we haven't already */ |
---|
633 |
if(!d->nparams) { |
---|
634 |
char *scp, *ecp; |
---|
635 |
|
---|
636 |
scp = d->data; |
---|
637 |
#define PROCESS_NEXT_FIELD(t,l) do { \ |
---|
638 |
if(!*scp) goto bad_row; \ |
---|
639 |
ecp = strchr(scp, '\t'); \ |
---|
640 |
if(!ecp) goto bad_row; \ |
---|
641 |
token = scp; \ |
---|
642 |
len = (ecp-scp); \ |
---|
643 |
scp = ecp + 1; \ |
---|
644 |
} while(0) |
---|
645 |
#define PROCESS_LAST_FIELD(t,l) do { \ |
---|
646 |
if(!*scp) ecp = scp; \ |
---|
647 |
else { \ |
---|
648 |
ecp = scp + strlen(scp); /* Puts us at the '\0' */ \ |
---|
649 |
if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \ |
---|
650 |
} \ |
---|
651 |
t = scp; \ |
---|
652 |
l = (ecp-scp); \ |
---|
653 |
} while(0) |
---|
654 |
|
---|
655 |
PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */ |
---|
656 |
switch(type) { |
---|
657 |
/* See noit_check_log.c for log description */ |
---|
658 |
case 'n': |
---|
659 |
DECLARE_PARAM_STR(raddr, strlen(raddr)); |
---|
660 |
DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); |
---|
661 |
DECLARE_PARAM_STR("noitd",5); /* node_type */ |
---|
662 |
PROCESS_NEXT_FIELD(token,len); |
---|
663 |
d->whence = (time_t)strtoul(token, NULL, 10); |
---|
664 |
DECLARE_PARAM_STR(token,len); /* timestamp */ |
---|
665 |
|
---|
666 |
/* This is the expected uncompressed len */ |
---|
667 |
PROCESS_NEXT_FIELD(token,len); |
---|
668 |
final_len = atoi(token); |
---|
669 |
final_buff = malloc(final_len); |
---|
670 |
if(!final_buff) goto bad_row; |
---|
671 |
|
---|
672 |
/* The last token is b64 endoded and compressed. |
---|
673 |
* we need to decode it, declare it and then free it. |
---|
674 |
*/ |
---|
675 |
PROCESS_LAST_FIELD(token, len); |
---|
676 |
/* We can in-place decode this */ |
---|
677 |
len = noit_b64_decode((char *)token, len, |
---|
678 |
(unsigned char *)token, len); |
---|
679 |
if(len <= 0) { |
---|
680 |
noitL(noit_error, "noitd config base64 decoding error.\n"); |
---|
681 |
free(final_buff); |
---|
682 |
goto bad_row; |
---|
683 |
} |
---|
684 |
actual_final_len = final_len; |
---|
685 |
if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len, |
---|
686 |
(unsigned char *)token, len)) { |
---|
687 |
noitL(noit_error, "noitd config decompression failure.\n"); |
---|
688 |
free(final_buff); |
---|
689 |
goto bad_row; |
---|
690 |
} |
---|
691 |
if(final_len != actual_final_len) { |
---|
692 |
noitL(noit_error, "noitd config decompression error.\n"); |
---|
693 |
free(final_buff); |
---|
694 |
goto bad_row; |
---|
695 |
} |
---|
696 |
DECLARE_PARAM_STR(final_buff, final_len); |
---|
697 |
free(final_buff); |
---|
698 |
break; |
---|
699 |
case 'D': |
---|
700 |
break; |
---|
701 |
case 'C': |
---|
702 |
DECLARE_PARAM_STR(raddr, strlen(raddr)); |
---|
703 |
PROCESS_NEXT_FIELD(token,len); |
---|
704 |
DECLARE_PARAM_STR(token,len); /* timestamp */ |
---|
705 |
d->whence = (time_t)strtoul(token, NULL, 10); |
---|
706 |
PROCESS_NEXT_FIELD(token, len); |
---|
707 |
/* uuid is last 36 bytes */ |
---|
708 |
if(len > 36) { token += (len-36); len = 36; } |
---|
709 |
sid = uuid_to_sid(token, remote_cn); |
---|
710 |
if(sid == 0) goto bad_row; |
---|
711 |
DECLARE_PARAM_INT(sid); /* sid */ |
---|
712 |
DECLARE_PARAM_STR(token,len); /* uuid */ |
---|
713 |
PROCESS_NEXT_FIELD(token, len); |
---|
714 |
DECLARE_PARAM_STR(token,len); /* target */ |
---|
715 |
PROCESS_NEXT_FIELD(token, len); |
---|
716 |
DECLARE_PARAM_STR(token,len); /* module */ |
---|
717 |
PROCESS_LAST_FIELD(token, len); |
---|
718 |
DECLARE_PARAM_STR(token,len); /* name */ |
---|
719 |
break; |
---|
720 |
case 'M': |
---|
721 |
PROCESS_NEXT_FIELD(token,len); |
---|
722 |
DECLARE_PARAM_STR(token,len); /* timestamp */ |
---|
723 |
d->whence = (time_t)strtoul(token, NULL, 10); |
---|
724 |
PROCESS_NEXT_FIELD(token, len); |
---|
725 |
/* uuid is last 36 bytes */ |
---|
726 |
if(len > 36) { token += (len-36); len = 36; } |
---|
727 |
sid = uuid_to_sid(token, remote_cn); |
---|
728 |
if(sid == 0) goto bad_row; |
---|
729 |
DECLARE_PARAM_INT(sid); /* sid */ |
---|
730 |
PROCESS_NEXT_FIELD(token, len); |
---|
731 |
DECLARE_PARAM_STR(token,len); /* name */ |
---|
732 |
PROCESS_NEXT_FIELD(token,len); |
---|
733 |
d->metric_type = *token; |
---|
734 |
PROCESS_LAST_FIELD(token,len); |
---|
735 |
DECLARE_PARAM_STR(token,len); /* value */ |
---|
736 |
break; |
---|
737 |
case 'S': |
---|
738 |
PROCESS_NEXT_FIELD(token,len); |
---|
739 |
DECLARE_PARAM_STR(token,len); /* timestamp */ |
---|
740 |
d->whence = (time_t)strtoul(token, NULL, 10); |
---|
741 |
PROCESS_NEXT_FIELD(token, len); |
---|
742 |
/* uuid is last 36 bytes */ |
---|
743 |
if(len > 36) { token += (len-36); len = 36; } |
---|
744 |
sid = uuid_to_sid(token, remote_cn); |
---|
745 |
if(sid == 0) goto bad_row; |
---|
746 |
DECLARE_PARAM_INT(sid); /* sid */ |
---|
747 |
PROCESS_NEXT_FIELD(token, len); |
---|
748 |
DECLARE_PARAM_STR(token,len); /* state */ |
---|
749 |
PROCESS_NEXT_FIELD(token, len); |
---|
750 |
DECLARE_PARAM_STR(token,len); /* availability */ |
---|
751 |
PROCESS_NEXT_FIELD(token, len); |
---|
752 |
DECLARE_PARAM_STR(token,len); /* duration */ |
---|
753 |
PROCESS_LAST_FIELD(token,len); |
---|
754 |
DECLARE_PARAM_STR(token,len); /* status */ |
---|
755 |
break; |
---|
756 |
default: |
---|
757 |
goto bad_row; |
---|
758 |
} |
---|
759 |
|
---|
760 |
} |
---|
761 |
|
---|
762 |
/* Now execute the query */ |
---|
763 |
switch(type) { |
---|
764 |
case 'n': |
---|
765 |
GET_QUERY(config_insert); |
---|
766 |
PG_EXEC(config_insert); |
---|
767 |
PQclear(d->res); |
---|
768 |
break; |
---|
769 |
case 'C': |
---|
770 |
GET_QUERY(check_insert); |
---|
771 |
PG_TM_EXEC(check_insert, d->whence); |
---|
772 |
PQclear(d->res); |
---|
773 |
break; |
---|
774 |
case 'S': |
---|
775 |
GET_QUERY(status_insert); |
---|
776 |
PG_TM_EXEC(status_insert, d->whence); |
---|
777 |
PQclear(d->res); |
---|
778 |
break; |
---|
779 |
case 'D': |
---|
780 |
break; |
---|
781 |
case 'M': |
---|
782 |
switch(d->metric_type) { |
---|
783 |
case METRIC_INT32: |
---|
784 |
case METRIC_UINT32: |
---|
785 |
case METRIC_INT64: |
---|
786 |
case METRIC_UINT64: |
---|
787 |
case METRIC_DOUBLE: |
---|
788 |
GET_QUERY(metric_insert_numeric); |
---|
789 |
PG_TM_EXEC(metric_insert_numeric, d->whence); |
---|
790 |
PQclear(d->res); |
---|
791 |
break; |
---|
792 |
case METRIC_STRING: |
---|
793 |
GET_QUERY(metric_insert_text); |
---|
794 |
PG_TM_EXEC(metric_insert_text, d->whence); |
---|
795 |
PQclear(d->res); |
---|
796 |
break; |
---|
797 |
default: |
---|
798 |
goto bad_row; |
---|
799 |
} |
---|
800 |
break; |
---|
801 |
default: |
---|
802 |
/* should never get here */ |
---|
803 |
goto bad_row; |
---|
804 |
} |
---|
805 |
return DS_EXEC_SUCCESS; |
---|
806 |
bad_row: |
---|
807 |
return DS_EXEC_ROW_FAILED; |
---|
808 |
} |
---|
809 |
static int |
---|
810 |
stratcon_database_post_connect(conn_q *cq) { |
---|
811 |
int rv = 0; |
---|
812 |
ds_single_detail _d = { 0 }, *d = &_d; |
---|
813 |
if(cq->fqdn) { |
---|
814 |
char *remote_str, *remote_cn; |
---|
815 |
/* This is the silly way we get null's in through our declare_param_str */ |
---|
816 |
remote_str = cq->remote_str ? cq->remote_str : "[[null]]"; |
---|
817 |
remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]"; |
---|
818 |
/* This is a storage node, it gets the storage node post_connect */ |
---|
819 |
GET_QUERY(storage_post_connect); |
---|
820 |
rv = -1; /* now we're serious */ |
---|
821 |
DECLARE_PARAM_STR(remote_str, strlen(remote_str)); |
---|
822 |
DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); |
---|
823 |
PG_EXEC(storage_post_connect); |
---|
824 |
PQclear(d->res); |
---|
825 |
rv = 0; |
---|
826 |
} |
---|
827 |
else { |
---|
828 |
/* Metanode post_connect */ |
---|
829 |
GET_QUERY(metanode_post_connect); |
---|
830 |
rv = -1; /* now we're serious */ |
---|
831 |
PG_EXEC(metanode_post_connect); |
---|
832 |
PQclear(d->res); |
---|
833 |
rv = 0; |
---|
834 |
} |
---|
835 |
bad_row: |
---|
836 |
free_params(d); |
---|
837 |
if(rv == -1) { |
---|
838 |
/* Post-connect intentions are serious and fatal */ |
---|
839 |
PQfinish(cq->dbh); |
---|
840 |
cq->dbh = NULL; |
---|
841 |
} |
---|
842 |
return rv; |
---|
843 |
} |
---|
844 |
static int |
---|
845 |
stratcon_database_connect(conn_q *cq) { |
---|
846 |
char *dsn, dsn_meta[512]; |
---|
847 |
noit_hash_iter iter = NOIT_HASH_ITER_ZERO; |
---|
848 |
const char *k, *v; |
---|
849 |
int klen; |
---|
850 |
noit_hash_table *t; |
---|
851 |
|
---|
852 |
dsn_meta[0] = '\0'; |
---|
853 |
if(!cq->dsn) { |
---|
854 |
t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); |
---|
855 |
while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { |
---|
856 |
if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta)); |
---|
857 |
strlcat(dsn_meta, k, sizeof(dsn_meta)); |
---|
858 |
strlcat(dsn_meta, "=", sizeof(dsn_meta)); |
---|
859 |
strlcat(dsn_meta, v, sizeof(dsn_meta)); |
---|
860 |
} |
---|
861 |
noit_hash_destroy(t, free, free); |
---|
862 |
free(t); |
---|
863 |
dsn = dsn_meta; |
---|
864 |
} |
---|
865 |
else { |
---|
866 |
char options[32]; |
---|
867 |
strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta)); |
---|
868 |
if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user", |
---|
869 |
options, sizeof(options))) { |
---|
870 |
strlcat(dsn_meta, " ", sizeof(dsn_meta)); |
---|
871 |
strlcat(dsn_meta, "user", sizeof(dsn_meta)); |
---|
872 |
strlcat(dsn_meta, "=", sizeof(dsn_meta)); |
---|
873 |
strlcat(dsn_meta, options, sizeof(dsn_meta)); |
---|
874 |
} |
---|
875 |
if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password", |
---|
876 |
options, sizeof(options))) { |
---|
877 |
strlcat(dsn_meta, " ", sizeof(dsn_meta)); |
---|
878 |
strlcat(dsn_meta, "password", sizeof(dsn_meta)); |
---|
879 |
strlcat(dsn_meta, "=", sizeof(dsn_meta)); |
---|
880 |
strlcat(dsn_meta, options, sizeof(dsn_meta)); |
---|
881 |
} |
---|
882 |
dsn = dsn_meta; |
---|
883 |
} |
---|
884 |
|
---|
885 |
if(cq->dbh) { |
---|
886 |
if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
---|
887 |
PQreset(cq->dbh); |
---|
888 |
if(PQstatus(cq->dbh) != CONNECTION_OK) { |
---|
889 |
noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", |
---|
890 |
dsn, PQerrorMessage(cq->dbh)); |
---|
891 |
return -1; |
---|
892 |
} |
---|
893 |
if(stratcon_database_post_connect(cq)) return -1; |
---|
894 |
if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
---|
895 |
noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", |
---|
896 |
dsn, PQerrorMessage(cq->dbh)); |
---|
897 |
return -1; |
---|
898 |
} |
---|
899 |
|
---|
900 |
cq->dbh = PQconnectdb(dsn); |
---|
901 |
if(!cq->dbh) return -1; |
---|
902 |
if(PQstatus(cq->dbh) != CONNECTION_OK) { |
---|
903 |
noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", |
---|
904 |
dsn, PQerrorMessage(cq->dbh)); |
---|
905 |
return -1; |
---|
906 |
} |
---|
907 |
if(stratcon_database_post_connect(cq)) return -1; |
---|
908 |
if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
---|
909 |
noitL(noit_error, "Error connection to database: '%s'\nError: %s\n", |
---|
910 |
dsn, PQerrorMessage(cq->dbh)); |
---|
911 |
return -1; |
---|
912 |
} |
---|
913 |
static int |
---|
914 |
stratcon_ingest_savepoint_op(conn_q *cq, const char *p, |
---|
915 |
const char *name) { |
---|
916 |
int rv = -1; |
---|
917 |
PGresult *res; |
---|
918 |
char cmd[128]; |
---|
919 |
strlcpy(cmd, p, sizeof(cmd)); |
---|
920 |
strlcat(cmd, name, sizeof(cmd)); |
---|
921 |
if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; |
---|
922 |
if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; |
---|
923 |
PQclear(res); |
---|
924 |
return rv; |
---|
925 |
} |
---|
926 |
static int |
---|
927 |
stratcon_ingest_do(conn_q *cq, const char *cmd) { |
---|
928 |
PGresult *res; |
---|
929 |
int rv = -1; |
---|
930 |
if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; |
---|
931 |
if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; |
---|
932 |
PQclear(res); |
---|
933 |
return rv; |
---|
934 |
} |
---|
935 |
#define BUSTED(cq) do { \ |
---|
936 |
PQfinish((cq)->dbh); \ |
---|
937 |
(cq)->dbh = NULL; \ |
---|
938 |
goto full_monty; \ |
---|
939 |
} while(0) |
---|
940 |
#define SAVEPOINT(name) do { \ |
---|
941 |
if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \ |
---|
942 |
last_sp = current; \ |
---|
943 |
} while(0) |
---|
944 |
#define ROLLBACK_TO_SAVEPOINT(name) do { \ |
---|
945 |
if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \ |
---|
946 |
BUSTED(cq); \ |
---|
947 |
last_sp = NULL; \ |
---|
948 |
} while(0) |
---|
949 |
#define RELEASE_SAVEPOINT(name) do { \ |
---|
950 |
if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \ |
---|
951 |
BUSTED(cq); \ |
---|
952 |
last_sp = NULL; \ |
---|
953 |
} while(0) |
---|
954 |
|
---|
955 |
int |
---|
956 |
stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure, |
---|
957 |
struct timeval *now) { |
---|
958 |
ds_rt_detail *dsjd = closure; |
---|
959 |
if(!(mask & EVENTER_ASYNCH_WORK)) return 0; |
---|
960 |
if(mask & EVENTER_ASYNCH_CLEANUP) return 0; |
---|
961 |
|
---|
962 |
assert(dsjd->rt); |
---|
963 |
stratcon_ingest_find(dsjd); |
---|
964 |
if(dsjd->completion_event) |
---|
965 |
eventer_add(dsjd->completion_event); |
---|
966 |
|
---|
967 |
free_params((ds_single_detail *)dsjd); |
---|
968 |
free(dsjd); |
---|
969 |
return 0; |
---|
970 |
} |
---|
971 |
static void |
---|
972 |
stratcon_ingestor_submit_lookup(struct realtime_tracker *rt, |
---|
973 |
eventer_t completion) { |
---|
974 |
eventer_t e; |
---|
975 |
conn_pool *cpool; |
---|
976 |
ds_rt_detail *rtdetail; |
---|
977 |
|
---|
978 |
cpool = get_conn_pool_for_remote(NULL,NULL,NULL); |
---|
979 |
rtdetail = calloc(1, sizeof(*rtdetail)); |
---|
980 |
rtdetail->rt = rt; |
---|
981 |
rtdetail->completion_event = completion; |
---|
982 |
e = eventer_alloc(); |
---|
983 |
e->mask = EVENTER_ASYNCH; |
---|
984 |
e->callback = stratcon_ingest_asynch_lookup; |
---|
985 |
e->closure = rtdetail; |
---|
986 |
eventer_add_asynch(cpool->jobq, e); |
---|
987 |
} |
---|
988 |
static const char * |
---|
989 |
get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) { |
---|
990 |
void *vinfo; |
---|
991 |
char *dsn = NULL, *fqdn = NULL; |
---|
992 |
int found = 0; |
---|
993 |
storagenode_info *info = NULL; |
---|
994 |
pthread_mutex_lock(&storagenode_to_info_cache_lock); |
---|
995 |
if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id), |
---|
996 |
&vinfo)) { |
---|
997 |
found = 1; |
---|
998 |
info = vinfo; |
---|
999 |
} |
---|
1000 |
pthread_mutex_unlock(&storagenode_to_info_cache_lock); |
---|
1001 |
if(found) { |
---|
1002 |
if(fqdn_out) *fqdn_out = info->fqdn; |
---|
1003 |
return info->dsn; |
---|
1004 |
} |
---|
1005 |
|
---|
1006 |
if(!found && can_use_db) { |
---|
1007 |
ds_single_detail *d; |
---|
1008 |
conn_q *cq; |
---|
1009 |
int row_count; |
---|
1010 |
/* Look it up and store it */ |
---|
1011 |
d = calloc(1, sizeof(*d)); |
---|
1012 |
cq = get_conn_q_for_metanode(); |
---|
1013 |
GET_QUERY(find_storage); |
---|
1014 |
DECLARE_PARAM_INT(id); |
---|
1015 |
PG_EXEC(find_storage); |
---|
1016 |
row_count = PQntuples(d->res); |
---|
1017 |
if(row_count) { |
---|
1018 |
PG_GET_STR_COL(dsn, 0, "dsn"); |
---|
1019 |
PG_GET_STR_COL(fqdn, 0, "fqdn"); |
---|
1020 |
fqdn = fqdn ? strdup(fqdn) : NULL; |
---|
1021 |
dsn = dsn ? strdup(dsn) : NULL; |
---|
1022 |
} |
---|
1023 |
PQclear(d->res); |
---|
1024 |
bad_row: |
---|
1025 |
free_params(d); |
---|
1026 |
free(d); |
---|
1027 |
release_conn_q(cq); |
---|
1028 |
} |
---|
1029 |
if(fqdn) { |
---|
1030 |
info = calloc(1, sizeof(*info)); |
---|
1031 |
info->fqdn = fqdn; |
---|
1032 |
if(fqdn_out) *fqdn_out = info->fqdn; |
---|
1033 |
info->dsn = dsn; |
---|
1034 |
info->storagenode_id = id; |
---|
1035 |
pthread_mutex_lock(&storagenode_to_info_cache_lock); |
---|
1036 |
noit_hash_store(&storagenode_to_info_cache, |
---|
1037 |
(void *)&info->storagenode_id, sizeof(int), info); |
---|
1038 |
pthread_mutex_unlock(&storagenode_to_info_cache_lock); |
---|
1039 |
} |
---|
1040 |
return info ? info->dsn : NULL; |
---|
1041 |
} |
---|
1042 |
static void |
---|
1043 |
expand_b_record(ds_line_detail **head, ds_line_detail **last, |
---|
1044 |
const char *line, int len) { |
---|
1045 |
char **outrows; |
---|
1046 |
int i, cnt; |
---|
1047 |
ds_line_detail *next; |
---|
1048 |
|
---|
1049 |
cnt = noit_check_log_b_to_sm(line, len, &outrows); |
---|
1050 |
for(i=0;i<cnt;i++) { |
---|
1051 |
if(outrows[i] == NULL) continue; |
---|
1052 |
next = calloc(sizeof(*next), 1); |
---|
1053 |
next->data = outrows[i]; |
---|
1054 |
if(!*head) *head = next; |
---|
1055 |
if(*last) (*last)->next = next; |
---|
1056 |
*last = next; |
---|
1057 |
} |
---|
1058 |
if(outrows) free(outrows); |
---|
1059 |
} |
---|
1060 |
static ds_line_detail * |
---|
1061 |
build_insert_batch(pg_interim_journal_t *ij) { |
---|
1062 |
int rv; |
---|
1063 |
off_t len; |
---|
1064 |
const char *buff, *cp, *lcp; |
---|
1065 |
struct stat st; |
---|
1066 |
ds_line_detail *head = NULL, *last = NULL, *next = NULL; |
---|
1067 |
|
---|
1068 |
if(ij->fd < 0) { |
---|
1069 |
ij->fd = open(ij->filename, O_RDONLY); |
---|
1070 |
if(ij->fd < 0) { |
---|
1071 |
noitL(noit_error, "Cannot open interim journal '%s': %s\n", |
---|
1072 |
ij->filename, strerror(errno)); |
---|
1073 |
assert(ij->fd >= 0); |
---|
1074 |
} |
---|
1075 |
} |
---|
1076 |
while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR); |
---|
1077 |
if(rv == -1) { |
---|
1078 |
noitL(noit_error, "Cannot stat interim journal '%s': %s\n", |
---|
1079 |
ij->filename, strerror(errno)); |
---|
1080 |
assert(rv != -1); |
---|
1081 |
} |
---|
1082 |
len = st.st_size; |
---|
1083 |
if(len > 0) { |
---|
1084 |
buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0); |
---|
1085 |
if(buff == (void *)-1) { |
---|
1086 |
noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd, |
---|
1087 |
ij->filename, strerror(errno)); |
---|
1088 |
assert(buff != (void *)-1); |
---|
1089 |
} |
---|
1090 |
lcp = buff; |
---|
1091 |
while(lcp < (buff + len) && |
---|
1092 |
NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) { |
---|
1093 |
if(lcp[0] == 'B' && lcp[1] != '\0' && lcp[2] == '\t') { |
---|
1094 |
/* Bundle records are special and need to be expanded into |
---|
1095 |
* traditional records here |
---|
1096 |
*/ |
---|
1097 |
noit_compression_type_t ctype = NOIT_COMPRESS_NONE; |
---|
1098 |
switch(lcp[1]) { |
---|
1099 |
case '1': /* version 1 */ |
---|
1100 |
ctype = NOIT_COMPRESS_ZLIB; /*no break fall through */ |
---|
1101 |
case '2': /* version 2 */ |
---|
1102 |
expand_b_record(&head, &last, lcp, cp - lcp); |
---|
1103 |
break; |
---|
1104 |
default: |
---|
1105 |
noitL(noit_error, "unknown bundle version %c\n", lcp[1]); |
---|
1106 |
} |
---|
1107 |
} |
---|
1108 |
else { |
---|
1109 |
next = calloc(1, sizeof(*next)); |
---|
1110 |
next->data = malloc(cp - lcp + 1); |
---|
1111 |
memcpy(next->data, lcp, cp - lcp); |
---|
1112 |
next->data[cp - lcp] = '\0'; |
---|
1113 |
if(!head) head = next; |
---|
1114 |
if(last) last->next = next; |
---|
1115 |
last = next; |
---|
1116 |
} |
---|
1117 |
lcp = cp + 1; |
---|
1118 |
} |
---|
1119 |
munmap((void *)buff, len); |
---|
1120 |
} |
---|
1121 |
close(ij->fd); |
---|
1122 |
return head; |
---|
1123 |
} |
---|
1124 |
static void |
---|
1125 |
pg_interim_journal_remove(pg_interim_journal_t *ij) { |
---|
1126 |
unlink(ij->filename); |
---|
1127 |
if(ij->filename) free(ij->filename); |
---|
1128 |
if(ij->remote_str) free(ij->remote_str); |
---|
1129 |
if(ij->remote_cn) free(ij->remote_cn); |
---|
1130 |
if(ij->fqdn) free(ij->fqdn); |
---|
1131 |
free(ij); |
---|
1132 |
} |
---|
1133 |
static int |
---|
1134 |
stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure, |
---|
1135 |
struct timeval *now) { |
---|
1136 |
int i, total, success, sp_total, sp_success; |
---|
1137 |
pg_interim_journal_t *ij; |
---|
1138 |
ds_line_detail *head = NULL, *current, *last_sp; |
---|
1139 |
const char *dsn; |
---|
1140 |
conn_q *cq; |
---|
1141 |
if(!(mask & EVENTER_ASYNCH_WORK)) return 0; |
---|
1142 |
if(mask & EVENTER_ASYNCH_CLEANUP) return 0; |
---|
1143 |
|
---|
1144 |
ij = closure; |
---|
1145 |
if(ij->fqdn == NULL) { |
---|
1146 |
dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn); |
---|
1147 |
if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */ |
---|
1148 |
} |
---|
1149 |
else { |
---|
1150 |
dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL); |
---|
1151 |
} |
---|
1152 |
cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn, |
---|
1153 |
ij->fqdn, dsn); |
---|
1154 |
noitL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n", |
---|
1155 |
ij->remote_str, ij->remote_cn, ij->fqdn); |
---|
1156 |
full_monty: |
---|
1157 |
/* Make sure we have a connection */ |
---|
1158 |
i = 1; |
---|
1159 |
while(stratcon_database_connect(cq)) { |
---|
1160 |
noitL(noit_error, "Error connecting to database: %s\n", |
---|
1161 |
ij->fqdn ? ij->fqdn : "(null)"); |
---|
1162 |
sleep(i); |
---|
1163 |
i *= 2; |
---|
1164 |
i = MIN(i, 16); |
---|
1165 |
} |
---|
1166 |
|
---|
1167 |
if(head == NULL) head = build_insert_batch(ij); |
---|
1168 |
noitL(ds_deb, "Starting batch from %s/%s to %s\n", |
---|
1169 |
ij->remote_str ? ij->remote_str : "(null)", |
---|
1170 |
ij->remote_cn ? ij->remote_cn : "(null)", |
---|
1171 |
ij->fqdn ? ij->fqdn : "(null)"); |
---|
1172 |
current = head; |
---|
1173 |
last_sp = NULL; |
---|
1174 |
total = success = sp_total = sp_success = 0; |
---|
1175 |
if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq); |
---|
1176 |
while(current) { |
---|
1177 |
execute_outcome_t rv; |
---|
1178 |
if(current->data) { |
---|
1179 |
if(!last_sp) { |
---|
1180 |
SAVEPOINT("batch"); |
---|
1181 |
sp_success = success; |
---|
1182 |
sp_total = total; |
---|
1183 |
} |
---|
1184 |
|
---|
1185 |
if(current->problematic) { |
---|
1186 |
RELEASE_SAVEPOINT("batch"); |
---|
1187 |
current = current->next; |
---|
1188 |
total++; |
---|
1189 |
continue; |
---|
1190 |
} |
---|
1191 |
rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn, |
---|
1192 |
current); |
---|
1193 |
switch(rv) { |
---|
1194 |
case DS_EXEC_SUCCESS: |
---|
1195 |
total++; |
---|
1196 |
success++; |
---|
1197 |
current = current->next; |
---|
1198 |
break; |
---|
1199 |
case DS_EXEC_ROW_FAILED: |
---|
1200 |
/* rollback to savepoint, mark this record as bad and start again */ |
---|
1201 |
if(current->data[0] != 'n') |
---|
1202 |
noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data); |
---|
1203 |
current->problematic = 1; |
---|
1204 |
current = last_sp; |
---|
1205 |
success = sp_success; |
---|
1206 |
total = sp_total; |
---|
1207 |
ROLLBACK_TO_SAVEPOINT("batch"); |
---|
1208 |
break; |
---|
1209 |
case DS_EXEC_TXN_FAILED: |
---|
1210 |
noitL(noit_error, "txn failed '%s', retrying\n", ij->filename); |
---|
1211 |
BUSTED(cq); |
---|
1212 |
} |
---|
1213 |
} |
---|
1214 |
} |
---|
1215 |
if(last_sp) RELEASE_SAVEPOINT("batch"); |
---|
1216 |
if(stratcon_ingest_do(cq, "COMMIT")) { |
---|
1217 |
noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename); |
---|
1218 |
BUSTED(cq); |
---|
1219 |
} |
---|
1220 |
/* Cleanup the mess */ |
---|
1221 |
while(head) { |
---|
1222 |
ds_line_detail *tofree; |
---|
1223 |
tofree = head; |
---|
1224 |
head = head->next; |
---|
1225 |
if(tofree->data) free(tofree->data); |
---|
1226 |
free_params((ds_single_detail *)tofree); |
---|
1227 |
free(tofree); |
---|
1228 |
} |
---|
1229 |
noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n", |
---|
1230 |
ij->remote_str ? ij->remote_str : "(null)", |
---|
1231 |
ij->remote_cn ? ij->remote_cn : "(null)", |
---|
1232 |
ij->fqdn ? ij->fqdn : "(null)", success, total); |
---|
1233 |
pg_interim_journal_remove(ij); |
---|
1234 |
release_conn_q(cq); |
---|
1235 |
return 0; |
---|
1236 |
} |
---|
1237 |
static int |
---|
1238 |
storage_node_quick_lookup(const char *uuid_str, const char *remote_cn, |
---|
1239 |
int *sid_out, int *storagenode_id_out, |
---|
1240 |
const char **remote_cn_out, |
---|
1241 |
const char **fqdn_out, const char **dsn_out) { |
---|
1242 |
/* only called from the main thread -- no safety issues */ |
---|
1243 |
void *vuuidinfo, *vinfo; |
---|
1244 |
uuid_info *uuidinfo; |
---|
1245 |
storagenode_info *info = NULL; |
---|
1246 |
char *fqdn = NULL; |
---|
1247 |
char *dsn = NULL; |
---|
1248 |
char *new_remote_cn = NULL; |
---|
1249 |
int storagenode_id = 0, sid = 0; |
---|
1250 |
if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str), |
---|
1251 |
&vuuidinfo)) { |
---|
1252 |
int row_count = 0; |
---|
1253 |
char *tmpint; |
---|
1254 |
ds_single_detail *d; |
---|
1255 |
conn_q *cq; |
---|
1256 |
|
---|
1257 |
/* We can't do a database lookup without the remote_cn */ |
---|
1258 |
if(!remote_cn) { |
---|
1259 |
if(stratcon_datastore_get_enabled()) { |
---|
1260 |
/* We have an authoritatively maintained cache, we don't do lookups */ |
---|
1261 |
return -1; |
---|
1262 |
} |
---|
1263 |
else |
---|
1264 |
remote_cn = "[[null]]"; |
---|
1265 |
} |
---|
1266 |
|
---|
1267 |
d = calloc(1, sizeof(*d)); |
---|
1268 |
cq = get_conn_q_for_metanode(); |
---|
1269 |
if(stratcon_database_connect(cq) == 0) { |
---|
1270 |
/* Blocking call to service the cache miss */ |
---|
1271 |
GET_QUERY(check_map); |
---|
1272 |
DECLARE_PARAM_STR(uuid_str, strlen(uuid_str)); |
---|
1273 |
DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); |
---|
1274 |
PG_EXEC(check_map); |
---|
1275 |
row_count = PQntuples(d->res); |
---|
1276 |
if(row_count != 1) { |
---|
1277 |
PQclear(d->res); |
---|
1278 |
goto bad_row; |
---|
1279 |
} |
---|
1280 |
PG_GET_STR_COL(tmpint, 0, "sid"); |
---|
1281 |
if(!tmpint) { |
---|
1282 |
row_count = 0; |
---|
1283 |
PQclear(d->res); |
---|
1284 |
goto bad_row; |
---|
1285 |
} |
---|
1286 |
sid = atoi(tmpint); |
---|
1287 |
PG_GET_STR_COL(tmpint, 0, "storage_node_id"); |
---|
1288 |
if(tmpint) storagenode_id = atoi(tmpint); |
---|
1289 |
PG_GET_STR_COL(fqdn, 0, "fqdn"); |
---|
1290 |
PG_GET_STR_COL(dsn, 0, "dsn"); |
---|
1291 |
PG_GET_STR_COL(new_remote_cn, 0, "remote_cn"); |
---|
1292 |
fqdn = fqdn ? strdup(fqdn) : NULL; |
---|
1293 |
dsn = dsn ? strdup(dsn) : NULL; |
---|
1294 |
new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL; |
---|
1295 |
PQclear(d->res); |
---|
1296 |
} |
---|
1297 |
bad_row: |
---|
1298 |
free_params((ds_single_detail *)d); |
---|
1299 |
free(d); |
---|
1300 |
release_conn_q(cq); |
---|
1301 |
if(row_count != 1) { |
---|
1302 |
return -1; |
---|
1303 |
} |
---|
1304 |
/* Place in cache */ |
---|
1305 |
uuidinfo = calloc(1, sizeof(*uuidinfo)); |
---|
1306 |
uuidinfo->sid = sid; |
---|
1307 |
uuidinfo->uuid_str = strdup(uuid_str); |
---|
1308 |
uuidinfo->storagenode_id = storagenode_id; |
---|
1309 |
uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn); |
---|
1310 |
noit_hash_store(&uuid_to_info_cache, |
---|
1311 |
uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); |
---|
1312 |
/* Also, we may have just witnessed a new storage node, store it */ |
---|
1313 |
if(storagenode_id) { |
---|
1314 |
int needs_free = 0; |
---|
1315 |
info = calloc(1, sizeof(*info)); |
---|
1316 |
info->storagenode_id = storagenode_id; |
---|
1317 |
info->dsn = dsn ? strdup(dsn) : NULL; |
---|
1318 |
info->fqdn = fqdn ? strdup(fqdn) : NULL; |
---|
1319 |
pthread_mutex_lock(&storagenode_to_info_cache_lock); |
---|
1320 |
if(!noit_hash_retrieve(&storagenode_to_info_cache, |
---|
1321 |
(void *)&storagenode_id, sizeof(int), &vinfo)) { |
---|
1322 |
/* hack to save memory -- we *never* remove from these caches, |
---|
1323 |
so we can use the same fqdn value in the above cache for the key |
---|
1324 |
in the cache below -- (no strdup) */ |
---|
1325 |
noit_hash_store(&storagenode_to_info_cache, |
---|
1326 |
(void *)&info->storagenode_id, sizeof(int), info); |
---|
1327 |
} |
---|
1328 |
else needs_free = 1; |
---|
1329 |
pthread_mutex_unlock(&storagenode_to_info_cache_lock); |
---|
1330 |
if(needs_free) { |
---|
1331 |
if(info->dsn) free(info->dsn); |
---|
1332 |
if(info->fqdn) free(info->fqdn); |
---|
1333 |
free(info); |
---|
1334 |
} |
---|
1335 |
} |
---|
1336 |
} |
---|
1337 |
else |
---|
1338 |
uuidinfo = vuuidinfo; |
---|
1339 |
|
---|
1340 |
if(uuidinfo && uuidinfo->storagenode_id) { |
---|
1341 |
if((!dsn && dsn_out) || (!fqdn && fqdn_out)) { |
---|
1342 |
/* we don't have dsn and we actually want it */ |
---|
1343 |
pthread_mutex_lock(&storagenode_to_info_cache_lock); |
---|
1344 |
if(noit_hash_retrieve(&storagenode_to_info_cache, |
---|
1345 |
(void *)&uuidinfo->storagenode_id, sizeof(int), |
---|
1346 |
&vinfo)) |
---|
1347 |
info = vinfo; |
---|
1348 |
pthread_mutex_unlock(&storagenode_to_info_cache_lock); |
---|
1349 |
} |
---|
1350 |
} |
---|
1351 |
|
---|
1352 |
if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL; |
---|
1353 |
if(dsn_out) *dsn_out = info ? info->dsn : NULL; |
---|
1354 |
assert(uuidinfo); |
---|
1355 |
if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn; |
---|
1356 |
if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id; |
---|
1357 |
if(sid_out) *sid_out = uuidinfo->sid; |
---|
1358 |
if(fqdn) free(fqdn); |
---|
1359 |
if(dsn) free(dsn); |
---|
1360 |
if(new_remote_cn) free(new_remote_cn); |
---|
1361 |
return 0; |
---|
1362 |
} |
---|
1363 |
static int |
---|
1364 |
uuid_to_sid(const char *uuid_str_in, const char *remote_cn) { |
---|
1365 |
char uuid_str[UUID_STR_LEN+1]; |
---|
1366 |
int sid = 0; |
---|
1367 |
strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str)); |
---|
1368 |
storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL); |
---|
1369 |
return sid; |
---|
1370 |
} |
---|
1371 |
|
---|
1372 |
static int |
---|
1373 |
stratcon_ingest_saveconfig() { |
---|
1374 |
int rv = -1; |
---|
1375 |
char *buff; |
---|
1376 |
ds_single_detail _d = { 0 }, *d = &_d; |
---|
1377 |
conn_q *cq; |
---|
1378 |
char ipv4_str[32]; |
---|
1379 |
struct in_addr r, l; |
---|
1380 |
|
---|
1381 |
r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1); |
---|
1382 |
memset(&l, 0, sizeof(l)); |
---|
1383 |
noit_getip_ipv4(r, &l); |
---|
1384 |
/* Ignore the error.. what are we going to do anyway */ |
---|
1385 |
if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL) |
---|
1386 |
strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str)); |
---|
1387 |
|
---|
1388 |
cq = get_conn_q_for_metanode(); |
---|
1389 |
|
---|
1390 |
if(stratcon_database_connect(cq) == 0) { |
---|
1391 |
char time_as_str[20]; |
---|
1392 |
size_t len; |
---|
1393 |
buff = noit_conf_xml_in_mem(&len); |
---|
1394 |
if(!buff) goto bad_row; |
---|
1395 |
|
---|
1396 |
snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL)); |
---|
1397 |
DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str)); |
---|
1398 |
DECLARE_PARAM_STR("", 0); |
---|
1399 |
DECLARE_PARAM_STR("stratcond", 9); |
---|
1400 |
DECLARE_PARAM_STR(time_as_str, strlen(time_as_str)); |
---|
1401 |
DECLARE_PARAM_STR(buff, len); |
---|
1402 |
free(buff); |
---|
1403 |
|
---|
1404 |
GET_QUERY(config_insert); |
---|
1405 |
PG_EXEC(config_insert); |
---|
1406 |
PQclear(d->res); |
---|
1407 |
rv = 0; |
---|
1408 |
|
---|
1409 |
bad_row: |
---|
1410 |
free_params(d); |
---|
1411 |
} |
---|
1412 |
release_conn_q(cq); |
---|
1413 |
return rv; |
---|
1414 |
} |
---|
1415 |
|
---|
1416 |
static int |
---|
1417 |
stratcon_ingest_launch_file_ingestion(const char *path, |
---|
1418 |
const char *remote_str, |
---|
1419 |
const char *remote_cn, |
---|
1420 |
const char *id_str) { |
---|
1421 |
pg_interim_journal_t *ij; |
---|
1422 |
char pgfile[PATH_MAX]; |
---|
1423 |
eventer_t ingest; |
---|
1424 |
|
---|
1425 |
if(strcmp(path + strlen(path) - 3, ".pg")) { |
---|
1426 |
snprintf(pgfile, sizeof(pgfile), "%s.pg", path); |
---|
1427 |
if(link(path, pgfile) < 0 && errno != EEXIST) { |
---|
1428 |
noitL(noit_error, "cannot link journal %s: %s\n", path, strerror(errno)); |
---|
1429 |
free(ij); |
---|
1430 |
return -1; |
---|
1431 |
} |
---|
1432 |
} |
---|
1433 |
else |
---|
1434 |
strlcpy(pgfile, path, sizeof(pgfile)); |
---|
1435 |
ij = calloc(1, sizeof(*ij)); |
---|
1436 |
ij->fd = open(pgfile, O_RDONLY); |
---|
1437 |
if(ij->fd < 0) { |
---|
1438 |
noitL(noit_error, "cannot open journal '%s': %s\n", |
---|
1439 |
pgfile, strerror(errno)); |
---|
1440 |
free(ij); |
---|
1441 |
return -1; |
---|
1442 |
} |
---|
1443 |
close(ij->fd); |
---|
1444 |
ij->fd = -1; |
---|
1445 |
ij->filename = strdup(pgfile); |
---|
1446 |
ij->remote_str = strdup(remote_str); |
---|
1447 |
ij->remote_cn = strdup(remote_cn); |
---|
1448 |
ij->storagenode_id = atoi(id_str); |
---|
1449 |
ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, |
---|
1450 |
ij->fqdn); |
---|
1451 |
noitL(noit_debug, "ingesting payload: %s\n", ij->filename); |
---|
1452 |
ingest = eventer_alloc(); |
---|
1453 |
ingest->mask = EVENTER_ASYNCH; |
---|
1454 |
ingest->callback = stratcon_ingest_asynch_execute; |
---|
1455 |
ingest->closure = ij; |
---|
1456 |
eventer_add_asynch(ij->cpool->jobq, ingest); |
---|
1457 |
return 0; |
---|
1458 |
} |
---|
1459 |
|
---|
1460 |
int |
---|
1461 |
stratcon_ingest_all_storagenode_info() { |
---|
1462 |
int i, cnt = 0; |
---|
1463 |
ds_single_detail _d = { 0 }, *d = &_d; |
---|
1464 |
conn_q *cq; |
---|
1465 |
cq = get_conn_q_for_metanode(); |
---|
1466 |
|
---|
1467 |
while(stratcon_database_connect(cq)) { |
---|
1468 |
noitL(noit_error, "Error connecting to database\n"); |
---|
1469 |
sleep(1); |
---|
1470 |
} |
---|
1471 |
|
---|
1472 |
GET_QUERY(all_storage); |
---|
1473 |
PG_EXEC(all_storage); |
---|
1474 |
cnt = PQntuples(d->res); |
---|
1475 |
for(i=0; i<cnt; i++) { |
---|
1476 |
void *vinfo; |
---|
1477 |
char *tmpint, *fqdn, *dsn; |
---|
1478 |
int storagenode_id; |
---|
1479 |
PG_GET_STR_COL(tmpint, i, "storage_node_id"); |
---|
1480 |
storagenode_id = atoi(tmpint); |
---|
1481 |
PG_GET_STR_COL(fqdn, i, "fqdn"); |
---|
1482 |
PG_GET_STR_COL(dsn, i, "dsn"); |
---|
1483 |
PG_GET_STR_COL(tmpint, i, "storage_node_id"); |
---|
1484 |
storagenode_id = tmpint ? atoi(tmpint) : 0; |
---|
1485 |
|
---|
1486 |
if(!noit_hash_retrieve(&storagenode_to_info_cache, |
---|
1487 |
(void *)&storagenode_id, sizeof(int), &vinfo)) { |
---|
1488 |
storagenode_info *info; |
---|
1489 |
info = calloc(1, sizeof(*info)); |
---|
1490 |
info->storagenode_id = storagenode_id; |
---|
1491 |
info->fqdn = fqdn ? strdup(fqdn) : NULL; |
---|
1492 |
info->dsn = dsn ? strdup(dsn) : NULL; |
---|
1493 |
noit_hash_store(&storagenode_to_info_cache, |
---|
1494 |
(void *)&info->storagenode_id, sizeof(int), info); |
---|
1495 |
noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n", |
---|
1496 |
info->storagenode_id, |
---|
1497 |
info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : ""); |
---|
1498 |
} |
---|
1499 |
noit_watchdog_child_heartbeat(); |
---|
1500 |
} |
---|
1501 |
PQclear(d->res); |
---|
1502 |
bad_row: |
---|
1503 |
free_params(d); |
---|
1504 |
|
---|
1505 |
release_conn_q(cq); |
---|
1506 |
noitL(noit_error, "Loaded %d storage nodes\n", cnt); |
---|
1507 |
return cnt; |
---|
1508 |
} |
---|
1509 |
int |
---|
1510 |
stratcon_ingest_all_check_info() { |
---|
1511 |
int i, cnt, loaded = 0; |
---|
1512 |
ds_single_detail _d = { 0 }, *d = &_d; |
---|
1513 |
conn_q *cq; |
---|
1514 |
cq = get_conn_q_for_metanode(); |
---|
1515 |
|
---|
1516 |
while(stratcon_database_connect(cq)) { |
---|
1517 |
noitL(noit_error, "Error connecting to database\n"); |
---|
1518 |
sleep(1); |
---|
1519 |
} |
---|
1520 |
|
---|
1521 |
GET_QUERY(check_mapall); |
---|
1522 |
PG_EXEC(check_mapall); |
---|
1523 |
cnt = PQntuples(d->res); |
---|
1524 |
for(i=0; i<cnt; i++) { |
---|
1525 |
void *vinfo; |
---|
1526 |
char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn; |
---|
1527 |
int sid, storagenode_id; |
---|
1528 |
uuid_info *uuidinfo; |
---|
1529 |
PG_GET_STR_COL(uuid_str, i, "id"); |
---|
1530 |
if(!uuid_str) continue; |
---|
1531 |
PG_GET_STR_COL(tmpint, i, "sid"); |
---|
1532 |
if(!tmpint) continue; |
---|
1533 |
sid = atoi(tmpint); |
---|
1534 |
PG_GET_STR_COL(fqdn, i, "fqdn"); |
---|
1535 |
PG_GET_STR_COL(dsn, i, "dsn"); |
---|
1536 |
PG_GET_STR_COL(remote_cn, i, "remote_cn"); |
---|
1537 |
PG_GET_STR_COL(tmpint, i, "storage_node_id"); |
---|
1538 |
storagenode_id = tmpint ? atoi(tmpint) : 0; |
---|
1539 |
|
---|
1540 |
uuidinfo = calloc(1, sizeof(*uuidinfo)); |
---|
1541 |
uuidinfo->uuid_str = strdup(uuid_str); |
---|
1542 |
uuidinfo->remote_cn = strdup(remote_cn); |
---|
1543 |
uuidinfo->storagenode_id = storagenode_id; |
---|
1544 |
uuidinfo->sid = sid; |
---|
1545 |
noit_hash_store(&uuid_to_info_cache, |
---|
1546 |
uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); |
---|
1547 |
noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n", |
---|
1548 |
uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id); |
---|
1549 |
loaded++; |
---|
1550 |
if(!noit_hash_retrieve(&storagenode_to_info_cache, |
---|
1551 |
(void *)&storagenode_id, sizeof(int), &vinfo)) { |
---|
1552 |
storagenode_info *info; |
---|
1553 |
info = calloc(1, sizeof(*info)); |
---|
1554 |
info->storagenode_id = storagenode_id; |
---|
1555 |
info->fqdn = fqdn ? strdup(fqdn) : NULL; |
---|
1556 |
info->dsn = dsn ? strdup(dsn) : NULL; |
---|
1557 |
noit_hash_store(&storagenode_to_info_cache, |
---|
1558 |
(void *)&info->storagenode_id, sizeof(int), info); |
---|
1559 |
noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n", |
---|
1560 |
info->storagenode_id, |
---|
1561 |
info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : ""); |
---|
1562 |
} |
---|
1563 |
noit_watchdog_child_heartbeat(); |
---|
1564 |
} |
---|
1565 |
PQclear(d->res); |
---|
1566 |
bad_row: |
---|
1567 |
free_params(d); |
---|
1568 |
|
---|
1569 |
release_conn_q(cq); |
---|
1570 |
noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded); |
---|
1571 |
return loaded; |
---|
1572 |
} |
---|
1573 |
|
---|
1574 |
static char * |
---|
1575 |
stratcon_get_noit_config(const char *cn) { |
---|
1576 |
ds_single_detail *d; |
---|
1577 |
int row_count = 0; |
---|
1578 |
const char *xml = NULL; |
---|
1579 |
char *xmlcopy = NULL; |
---|
1580 |
conn_q *cq = NULL; |
---|
1581 |
|
---|
1582 |
d = calloc(1, sizeof(*d)); |
---|
1583 |
GET_QUERY(config_get); |
---|
1584 |
cq = get_conn_q_for_metanode(); |
---|
1585 |
if(!cq) goto bad_row; |
---|
1586 |
|
---|
1587 |
DECLARE_PARAM_STR(cn, cn ? strlen(cn) : 0); |
---|
1588 |
PG_EXEC(config_get); |
---|
1589 |
row_count = PQntuples(d->res); |
---|
1590 |
if(row_count == 1) PG_GET_STR_COL(xml, 0, "config"); |
---|
1591 |
|
---|
1592 |
if(xml) xmlcopy = strdup(xml); |
---|
1593 |
|
---|
1594 |
bad_row: |
---|
1595 |
free_params((ds_single_detail *)d); |
---|
1596 |
d->nparams = 0; |
---|
1597 |
if(cq) release_conn_q(cq); |
---|
1598 |
|
---|
1599 |
return xmlcopy; |
---|
1600 |
} |
---|
1601 |
|
---|
1602 |
static ingestor_api_t postgres_ingestor_api = { |
---|
1603 |
.launch_file_ingestion = stratcon_ingest_launch_file_ingestion, |
---|
1604 |
.iep_check_preload = stratcon_ingest_iep_check_preload, |
---|
1605 |
.storage_node_lookup = storage_node_quick_lookup, |
---|
1606 |
.submit_realtime_lookup = stratcon_ingestor_submit_lookup, |
---|
1607 |
.get_noit_config = stratcon_get_noit_config, |
---|
1608 |
.save_config = stratcon_ingest_saveconfig |
---|
1609 |
}; |
---|
1610 |
|
---|
1611 |
static int postgres_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) { |
---|
1612 |
return 0; |
---|
1613 |
} |
---|
1614 |
static int postgres_ingestor_onload(noit_image_t *self) { |
---|
1615 |
return 0; |
---|
1616 |
} |
---|
1617 |
static int is_postgres_ingestor_file(const char *file) { |
---|
1618 |
noit_watchdog_child_heartbeat(); |
---|
1619 |
return (strlen(file) == 19 && !strcmp(file + 16, ".pg")); |
---|
1620 |
} |
---|
1621 |
static int postgres_ingestor_init(noit_module_generic_t *self) { |
---|
1622 |
stratcon_datastore_core_init(); |
---|
1623 |
pthread_mutex_init(&ds_conns_lock, NULL); |
---|
1624 |
pthread_mutex_init(&storagenode_to_info_cache_lock, NULL); |
---|
1625 |
ds_err = noit_log_stream_find("error/datastore"); |
---|
1626 |
ds_deb = noit_log_stream_find("debug/datastore"); |
---|
1627 |
ds_pool_deb = noit_log_stream_find("debug/datastore_pool"); |
---|
1628 |
ingest_err = noit_log_stream_find("error/ingest"); |
---|
1629 |
if(!ds_err) ds_err = noit_error; |
---|
1630 |
if(!ingest_err) ingest_err = noit_error; |
---|
1631 |
if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path", |
---|
1632 |
&basejpath)) { |
---|
1633 |
noitL(noit_error, "/stratcon/database/journal/path is unspecified\n"); |
---|
1634 |
exit(-1); |
---|
1635 |
} |
---|
1636 |
stratcon_ingest_all_check_info(); |
---|
1637 |
stratcon_ingest_all_storagenode_info(); |
---|
1638 |
stratcon_ingest_sweep_journals(is_postgres_ingestor_file, |
---|
1639 |
stratcon_ingest_launch_file_ingestion); |
---|
1640 |
return stratcon_datastore_set_ingestor(&postgres_ingestor_api); |
---|
1641 |
} |
---|
1642 |
|
---|
1643 |
noit_module_generic_t postgres_ingestor = { |
---|
1644 |
{ |
---|
1645 |
NOIT_GENERIC_MAGIC, |
---|
1646 |
NOIT_GENERIC_ABI_VERSION, |
---|
1647 |
"postgres_ingestor", |
---|
1648 |
"postgres drive for data ingestion", |
---|
1649 |
postgres_ingestor_xml_description, |
---|
1650 |
postgres_ingestor_onload, |
---|
1651 |
}, |
---|
1652 |
postgres_ingestor_config, |
---|
1653 |
postgres_ingestor_init |
---|
1654 |
}; |
---|