root/src/stratcon_datastore.c

Revision fd806572060400af50d983bcef55dd90a6c39bde, 16.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

too specific

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <zlib.h>
54 #include <assert.h>
55 #include <errno.h>
56
57 static noit_log_stream_t ds_err = NULL;
58 static noit_log_stream_t ds_deb = NULL;
59 static noit_log_stream_t ds_pool_deb = NULL;
60 static noit_log_stream_t ingest_err = NULL;
61 static char *basejpath = NULL;
62
63 static ingestor_api_t *ingestor = NULL;
64 typedef struct ingest_chain_t {
65   ingestor_api_t *ingestor;
66   struct ingest_chain_t *next;
67 } ingest_chain_t;
68
69 static ingest_chain_t *ingestor_chain;
70
71 static int ds_system_enabled = 1;
72 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
73 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
74 int stratcon_datastore_set_ingestor(ingestor_api_t *ni) {
75   ingest_chain_t *p, *i = calloc(1, sizeof(*i));
76   i->ingestor = ni;
77   if(!ingestor_chain) ingestor_chain = i;
78   else {
79     for(p = ingestor_chain; p->next; p = p->next);
80     p->next = i;
81   }
82   ingestor = ingestor_chain->ingestor;
83   return 0;
84 }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 typedef struct {
93   noit_hash_table *ws;
94   eventer_t completion;
95 } syncset_t;
96
97 noit_hash_table working_sets;
98
99 static void
100 interim_journal_free(void *vij) {
101   interim_journal_t *ij = vij;
102   if(ij->filename) free(ij->filename);
103   if(ij->remote_str) free(ij->remote_str);
104   if(ij->remote_cn) free(ij->remote_cn);
105   if(ij->fqdn) free(ij->fqdn);
106   free(ij);
107 }
108
109 static void
110 stratcon_ingest_sweep_journals_int(char *first, char *second, char *third,
111                                    int (*test)(const char *),
112                                    int (*ingest)(const char *fullpath,
113                                                  const char *remote_str,
114                                                  const char *remote_cn,
115                                                  const char *id_str)) {
116   char path[PATH_MAX];
117   DIR *root;
118   struct dirent *de, *entry;
119   int i = 0, cnt = 0;
120   char **entries;
121   int size = 0;
122
123   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
124            first ? "/" : "", first ? first : "",
125            second ? "/" : "", second ? second : "",
126            third ? "/" : "", third ? third : "");
127 #ifdef _PC_NAME_MAX
128   size = pathconf(path, _PC_NAME_MAX);
129 #endif
130   size = MAX(size, PATH_MAX + 128);
131   de = alloca(size);
132   root = opendir(path);
133   if(!root) return;
134   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
135   closedir(root);
136   root = opendir(path);
137   if(!root) return;
138   entries = malloc(sizeof(*entries) * cnt);
139   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
140     if(i < cnt) {
141       entries[i++] = strdup(entry->d_name);
142     }
143   }
144   closedir(root);
145   cnt = i; /* could have changed, directories are fickle */
146   qsort(entries, i, sizeof(*entries),
147         (int (*)(const void *, const void *))strcasecmp);
148   for(i=0; i<cnt; i++) {
149     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
150     noitL(ds_deb, "Processing L%d entry '%s'\n",
151           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
152     if(!first)
153       stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL, test, ingest);
154     else if(!second)
155       stratcon_ingest_sweep_journals_int(first, entries[i], NULL, test, ingest);
156     else if(!third)
157       stratcon_ingest_sweep_journals_int(first, second, entries[i], test, ingest);
158     else if(test(entries[i])) {
159       char fullpath[PATH_MAX];
160       snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath,
161                first,second,third,entries[i]);
162       ingest(fullpath,first,second,third);
163     }
164   }
165   for(i=0; i<cnt; i++)
166     free(entries[i]);
167   free(entries);
168 }
169 void
170 stratcon_ingest_sweep_journals(int (*test)(const char *),
171                                int (*ingest)(const char *fullpath,
172                                              const char *remote_str,
173                                              const char *remote_cn,
174                                              const char *id_str)) {
175   stratcon_ingest_sweep_journals_int(NULL,NULL,NULL, test, ingest);
176 }
177
178 static int
179 stratcon_ingest(const char *fullpath, const char *remote_str,
180                 const char *remote_cn, const char *id_str) {
181   ingest_chain_t *ic;
182   int err = 0;
183   for(ic = ingestor_chain; ic; ic = ic->next)
184     if(ic->ingestor->launch_file_ingestion(fullpath, remote_str,
185                                            remote_cn, id_str))
186       err = -1;
187   if(err == 0) {
188     unlink(fullpath);
189   }
190   return err;
191 }
192 static int
193 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
194                                 struct timeval *now) {
195   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
196   const char *k;
197   int klen;
198   void *vij;
199   interim_journal_t *ij;
200   syncset_t *syncset = closure;
201
202   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
203     if(syncset->completion) {
204       eventer_add(syncset->completion);
205       eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
206     }
207     free(syncset);
208     return 0;
209   }
210   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
211
212   noitL(ds_deb, "Syncing journal sets...\n");
213   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
214     char tmppath[PATH_MAX], id_str[32];
215     int suffix_idx;
216     ij = vij;
217     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
218           ij->remote_str, ij->remote_cn, ij->fqdn);
219     strlcpy(tmppath, ij->filename, sizeof(tmppath));
220     suffix_idx = strlen(ij->filename) - 4; /* . t m p */
221     ij->filename[suffix_idx] = '\0';
222     if(rename(tmppath, ij->filename) != 0) {
223       if(errno == EEXIST) {
224         unlink(ij->filename);
225         if(rename(tmppath, ij->filename) != 0) goto rename_failed;
226       }
227       else {
228        rename_failed:
229         noitL(noit_error, "rename failed(%s): (%s->%s)\n", strerror(errno),
230               tmppath, ij->filename);
231         exit(-1);
232       }
233     }
234     fsync(ij->fd);
235     close(ij->fd);
236     ij->fd = -1;
237     snprintf(id_str, sizeof(id_str), "%d", ij->storagenode_id);
238     stratcon_ingest(ij->filename, ij->remote_str,
239                     ij->remote_cn, id_str);
240   }
241   noit_hash_destroy(syncset->ws, free, interim_journal_free);
242   free(syncset->ws);
243   return 0;
244 }
245 static interim_journal_t *
246 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
247                     int storagenode_id, const char *fqdn_in) {
248   void *vhash, *vij;
249   noit_hash_table *working_set;
250   interim_journal_t *ij;
251   struct timeval now;
252   char jpath[PATH_MAX];
253   char remote_str[128];
254   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
255   const char *fqdn = fqdn_in ? fqdn_in : "default";
256
257   noit_convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
258   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
259
260   /* Lookup the working set */
261   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
262     working_set = calloc(1, sizeof(*working_set));
263     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
264                     working_set);
265   }
266   else
267     working_set = vhash;
268
269   /* Lookup the interim journal within the working set */
270   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
271     ij = calloc(1, sizeof(*ij));
272     gettimeofday(&now, NULL);
273     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x.tmp",
274              basejpath, remote_str, remote_cn, storagenode_id,
275              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
276     ij->remote_str = strdup(remote_str);
277     ij->remote_cn = strdup(remote_cn);
278     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
279     ij->storagenode_id = storagenode_id;
280     ij->filename = strdup(jpath);
281     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
282     if(ij->fd < 0 && errno == ENOENT) {
283       if(mkdir_for_file(ij->filename, 0750)) {
284         noitL(noit_error, "Failed to create dir for '%s': %s\n",
285               ij->filename, strerror(errno));
286         exit(-1);
287       }
288       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
289     }
290     if(ij->fd < 0 && errno == EEXIST) {
291       /* This can only occur if we crash after before checkpointing */
292       unlink(ij->filename);
293       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
294     }
295     if(ij->fd < 0) {
296       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
297             ij->filename, strerror(errno));
298       exit(-1);
299     }
300     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
301   }
302   else
303     ij = vij;
304
305   return ij;
306 }
307 static void
308 stratcon_datastore_journal(struct sockaddr *remote,
309                            const char *remote_cn, char *line) {
310   interim_journal_t *ij = NULL;
311   char uuid_str[UUID_STR_LEN+1], *cp1, *cp2;
312   char rtype[256];
313   const char *fqdn = NULL, *dsn = NULL;
314   int storagenode_id = 0;
315   uuid_t checkid;
316   if(!line) return;
317   cp1 = strchr(line, '\t');
318   if(cp1 && cp1 - line < sizeof(rtype) - 1) {
319     memcpy(rtype, line, cp1 - line);
320     rtype[cp1 - line] = '\0';
321   }
322   else rtype[0] = '\0';
323   /* if it is a UUID based thing, find the storage node */
324   switch(*rtype) {
325     case 'C':
326     case 'S':
327     case 'M':
328     case 'D':
329     case 'B':
330       if((cp1 = strchr(cp1+1, '\t')) != NULL &&
331          (cp2 = strchr(cp1+1, '\t')) != NULL &&
332          (cp2-cp1 >= UUID_STR_LEN)) {
333         strlcpy(uuid_str, cp2 - UUID_STR_LEN, sizeof(uuid_str));
334         if(!uuid_parse(uuid_str, checkid)) {
335           ingestor->storage_node_lookup(uuid_str, remote_cn, NULL,
336                                         &storagenode_id, NULL,
337                                         &fqdn, &dsn);
338           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
339         }
340       }
341       break;
342     case 'n':
343       ij = interim_journal_get(remote,remote_cn,0,NULL);
344       break;
345     default:
346       break;
347   }
348   if(!ij) {
349     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
350   }
351   else {
352     int len;
353     len = write(ij->fd, line, strlen(line));
354     if(len < 0) {
355       noitL(noit_error, "write to %s failed: %s\n",
356             ij->filename, strerror(errno));
357     }
358   }
359   free(line);
360   return;
361 }
362 static noit_hash_table *
363 stratcon_datastore_journal_remove(struct sockaddr *remote,
364                                   const char *remote_cn) {
365   void *vhash = NULL;
366   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
367     /* pluck it out */
368     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
369   }
370   else {
371     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
372           remote_cn);
373     abort();
374   }
375   return vhash;
376 }
377 void
378 stratcon_datastore_push(stratcon_datastore_op_t op,
379                         struct sockaddr *remote,
380                         const char *remote_cn, void *operand,
381                         eventer_t completion) {
382   syncset_t *syncset;
383   eventer_t e;
384   struct realtime_tracker *rt;
385   struct datastore_onlooker_list *nnode;
386
387   for(nnode = onlookers; nnode; nnode = nnode->next)
388     nnode->dispatch(op,remote,remote_cn,operand);
389
390   switch(op) {
391     case DS_OP_INSERT:
392       stratcon_datastore_journal(remote, remote_cn, (char *)operand);
393       break;
394     case DS_OP_CHKPT:
395       e = eventer_alloc();
396       syncset = calloc(1, sizeof(*syncset));
397       e->mask = EVENTER_ASYNCH;
398       e->callback = stratcon_datastore_journal_sync;
399       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
400       syncset->completion = completion;
401       e->closure = syncset;
402       eventer_add(e);
403       break;
404     case DS_OP_FIND_COMPLETE:
405       rt = operand;
406       ingestor->submit_realtime_lookup(rt, completion);
407       break;
408   }
409 }
410
411 void
412 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
413                                                struct sockaddr *,
414                                                const char *, void *)) {
415   struct datastore_onlooker_list *nnode;
416   volatile void **vonlookers = (void *)&onlookers;
417   nnode = calloc(1, sizeof(*nnode));
418   nnode->dispatch = f;
419   nnode->next = onlookers;
420   while(noit_atomic_casptr(vonlookers,
421                            nnode, nnode->next) != (void *)nnode->next)
422     nnode->next = onlookers;
423 }
424
425 static int
426 rest_get_noit_config(noit_http_rest_closure_t *restc,
427                      int npats, char **pats) {
428   noit_http_session_ctx *ctx = restc->http_ctx;
429   char *xml = NULL;
430
431   if(npats != 0) {
432     noit_http_response_server_error(ctx, "text/xml");
433     noit_http_response_end(ctx);
434     return 0;
435   }
436
437   xml = ingestor->get_noit_config(restc->remote_cn);
438
439   if(xml == NULL) {
440     char buff[1024];
441     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
442                                  "<row_count>%d</row_count></error>\n",
443              restc->remote_cn, 0);
444     noit_http_response_append(ctx, buff, strlen(buff));
445     noit_http_response_not_found(ctx, "text/xml");
446   }
447   else {
448     noit_http_response_append(ctx, xml, strlen(xml));
449     noit_http_response_ok(ctx, "text/xml");
450   }
451
452   if(xml) free(xml);
453   noit_http_response_end(ctx);
454   return 0;
455 }
456
457 void
458 stratcon_datastore_iep_check_preload() {
459   ingestor->iep_check_preload();
460 }
461
462 int
463 stratcon_datastore_saveconfig(void *unused) {
464   return ingestor->save_config();
465 }
466
467 static int is_raw_ingestion_file(const char *file) {
468   return (strlen(file) == 16);
469 }
470
471 void
472 stratcon_datastore_core_init() {
473   ds_err = noit_log_stream_find("error/datastore");
474   ds_deb = noit_log_stream_find("debug/datastore");
475   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
476   ingest_err = noit_log_stream_find("error/ingest");
477   if(!ds_err) ds_err = noit_error;
478   if(!ingest_err) ingest_err = noit_error;
479   if(!noit_conf_get_string(NULL, "//database/journal/path",
480                            &basejpath)) {
481     noitL(noit_error, "//database/journal/path is unspecified\n");
482     exit(-1);
483   }
484 }
485 void
486 stratcon_datastore_init() {
487   stratcon_datastore_core_init();
488
489   stratcon_ingest_sweep_journals(is_raw_ingestion_file,
490                                  stratcon_ingest);
491
492   assert(noit_http_rest_register_auth(
493     "GET", "/noits/", "^config$", rest_get_noit_config,
494              noit_http_rest_client_cert_auth
495   ) == 0);
496 }
497
Note: See TracBrowser for help on using the browser.