root/src/stratcon_datastore.c

Revision d10f13b10b05856e66c66dc5f7efb410316d9d6d, 12.3 kB (checked in by Dan Di Spaltro <dan@cloudkick.com>, 3 years ago)

Add Delete messages to noit so we get a message when we deschedule a check

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <zlib.h>
54 #include <assert.h>
55 #include <errno.h>
56
57 static noit_log_stream_t ds_err = NULL;
58 static noit_log_stream_t ds_deb = NULL;
59 static noit_log_stream_t ds_pool_deb = NULL;
60 static noit_log_stream_t ingest_err = NULL;
61 static char *basejpath = NULL;
62
63 static ingestor_api_t *ingestor = NULL;
64 static int ds_system_enabled = 1;
65 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
66 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
67 int stratcon_datastore_set_ingestor(ingestor_api_t *ni) {
68   if(ingestor) return -1;
69   ingestor = ni;
70   return 0;
71 }
72
73 static struct datastore_onlooker_list {
74   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
75                    const char *, void *);
76   struct datastore_onlooker_list *next;
77 } *onlookers = NULL;
78
79 typedef struct {
80   noit_hash_table *ws;
81   eventer_t completion;
82 } syncset_t;
83
84 noit_hash_table working_sets;
85
86 static void
87 interim_journal_free(void *vij) {
88   interim_journal_t *ij = vij;
89   if(ij->filename) free(ij->filename);
90   if(ij->remote_str) free(ij->remote_str);
91   if(ij->remote_cn) free(ij->remote_cn);
92   if(ij->fqdn) free(ij->fqdn);
93   free(ij);
94 }
95 static int
96 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
97                                 struct timeval *now) {
98   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
99   const char *k;
100   int klen;
101   void *vij;
102   interim_journal_t *ij;
103   syncset_t *syncset = closure;
104
105   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
106     if(syncset->completion) {
107       eventer_add(syncset->completion);
108       eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
109     }
110     free(syncset);
111     return 0;
112   }
113   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
114
115   noitL(ds_deb, "Syncing journal sets...\n");
116   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
117     char tmppath[PATH_MAX], id_str[32];
118     int suffix_idx;
119     ij = vij;
120     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
121           ij->remote_str, ij->remote_cn, ij->fqdn);
122     strlcpy(tmppath, ij->filename, sizeof(tmppath));
123     suffix_idx = strlen(ij->filename) - 4; /* . t m p */
124     ij->filename[suffix_idx] = '\0';
125     if(rename(tmppath, ij->filename) != 0) {
126       if(errno == EEXIST) {
127         unlink(ij->filename);
128         if(rename(tmppath, ij->filename) != 0) goto rename_failed;
129       }
130       else {
131        rename_failed:
132         noitL(noit_error, "rename failed(%s): (%s->%s)\n", strerror(errno),
133               tmppath, ij->filename);
134         exit(-1);
135       }
136     }
137     fsync(ij->fd);
138     close(ij->fd);
139     ij->fd = -1;
140     snprintf(id_str, sizeof(id_str), "%d", ij->storagenode_id);
141     ingestor->launch_file_ingestion(ij->filename, ij->remote_str,
142                                     ij->remote_cn, id_str);
143   }
144   noit_hash_destroy(syncset->ws, free, interim_journal_free);
145   free(syncset->ws);
146   return 0;
147 }
148 static interim_journal_t *
149 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
150                     int storagenode_id, const char *fqdn_in) {
151   void *vhash, *vij;
152   noit_hash_table *working_set;
153   interim_journal_t *ij;
154   struct timeval now;
155   char jpath[PATH_MAX];
156   char remote_str[128];
157   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
158   const char *fqdn = fqdn_in ? fqdn_in : "default";
159
160   noit_convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
161   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
162
163   /* Lookup the working set */
164   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
165     working_set = calloc(1, sizeof(*working_set));
166     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
167                     working_set);
168   }
169   else
170     working_set = vhash;
171
172   /* Lookup the interim journal within the working set */
173   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
174     ij = calloc(1, sizeof(*ij));
175     gettimeofday(&now, NULL);
176     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x.tmp",
177              basejpath, remote_str, remote_cn, storagenode_id,
178              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
179     ij->remote_str = strdup(remote_str);
180     ij->remote_cn = strdup(remote_cn);
181     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
182     ij->storagenode_id = storagenode_id;
183     ij->filename = strdup(jpath);
184     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
185     if(ij->fd < 0 && errno == ENOENT) {
186       if(mkdir_for_file(ij->filename, 0750)) {
187         noitL(noit_error, "Failed to create dir for '%s': %s\n",
188               ij->filename, strerror(errno));
189         exit(-1);
190       }
191       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
192     }
193     if(ij->fd < 0 && errno == EEXIST) {
194       /* This can only occur if we crash after before checkpointing */
195       unlink(ij->filename);
196       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
197     }
198     if(ij->fd < 0) {
199       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
200             ij->filename, strerror(errno));
201       exit(-1);
202     }
203     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
204   }
205   else
206     ij = vij;
207
208   return ij;
209 }
210 static void
211 stratcon_datastore_journal(struct sockaddr *remote,
212                            const char *remote_cn, char *line) {
213   interim_journal_t *ij = NULL;
214   char uuid_str[UUID_STR_LEN+1], *cp1, *cp2;
215   const char *fqdn = NULL, *dsn = NULL;
216   int storagenode_id = 0;
217   uuid_t checkid;
218   if(!line) return;
219   /* if it is a UUID based thing, find the storage node */
220   switch(*line) {
221     case 'C':
222     case 'S':
223     case 'M':
224     case 'D':
225       if(line[1] == '\t' && (cp1 = strchr(line+2, '\t')) != NULL &&
226          (cp2 = strchr(cp1+1, '\t')) != NULL &&
227          (cp2-cp1 >= UUID_STR_LEN)) {
228         strlcpy(uuid_str, cp2 - UUID_STR_LEN, sizeof(uuid_str));
229         if(!uuid_parse(uuid_str, checkid)) {
230           ingestor->storage_node_lookup(uuid_str, remote_cn, NULL,
231                                         &storagenode_id, NULL,
232                                         &fqdn, &dsn);
233           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
234         }
235       }
236       break;
237     case 'n':
238       ij = interim_journal_get(remote,remote_cn,0,NULL);
239       break;
240     default:
241       break;
242   }
243   if(!ij) {
244     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
245   }
246   else {
247     int len;
248     len = write(ij->fd, line, strlen(line));
249     if(len < 0) {
250       noitL(noit_error, "write to %s failed: %s\n",
251             ij->filename, strerror(errno));
252     }
253   }
254   free(line);
255   return;
256 }
257 static noit_hash_table *
258 stratcon_datastore_journal_remove(struct sockaddr *remote,
259                                   const char *remote_cn) {
260   void *vhash = NULL;
261   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
262     /* pluck it out */
263     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
264   }
265   else {
266     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
267           remote_cn);
268     abort();
269   }
270   return vhash;
271 }
272 void
273 stratcon_datastore_push(stratcon_datastore_op_t op,
274                         struct sockaddr *remote,
275                         const char *remote_cn, void *operand,
276                         eventer_t completion) {
277   syncset_t *syncset;
278   eventer_t e;
279   struct realtime_tracker *rt;
280   struct datastore_onlooker_list *nnode;
281
282   for(nnode = onlookers; nnode; nnode = nnode->next)
283     nnode->dispatch(op,remote,remote_cn,operand);
284
285   switch(op) {
286     case DS_OP_INSERT:
287       stratcon_datastore_journal(remote, remote_cn, (char *)operand);
288       break;
289     case DS_OP_CHKPT:
290       e = eventer_alloc();
291       syncset = calloc(1, sizeof(*syncset));
292       e->mask = EVENTER_ASYNCH;
293       e->callback = stratcon_datastore_journal_sync;
294       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
295       syncset->completion = completion;
296       e->closure = syncset;
297       eventer_add(e);
298       break;
299     case DS_OP_FIND_COMPLETE:
300       rt = operand;
301       ingestor->submit_realtime_lookup(rt, completion);
302       break;
303   }
304 }
305
306 void
307 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
308                                                struct sockaddr *,
309                                                const char *, void *)) {
310   struct datastore_onlooker_list *nnode;
311   volatile void **vonlookers = (void *)&onlookers;
312   nnode = calloc(1, sizeof(*nnode));
313   nnode->dispatch = f;
314   nnode->next = onlookers;
315   while(noit_atomic_casptr(vonlookers,
316                            nnode, nnode->next) != (void *)nnode->next)
317     nnode->next = onlookers;
318 }
319
320 static int
321 rest_get_noit_config(noit_http_rest_closure_t *restc,
322                      int npats, char **pats) {
323   noit_http_session_ctx *ctx = restc->http_ctx;
324   char *xml = NULL;
325
326   if(npats != 0) {
327     noit_http_response_server_error(ctx, "text/xml");
328     noit_http_response_end(ctx);
329     return 0;
330   }
331
332   xml = ingestor->get_noit_config(restc->remote_cn);
333
334   if(xml == NULL) {
335     char buff[1024];
336     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
337                                  "<row_count>%d</row_count></error>\n",
338              restc->remote_cn, 0);
339     noit_http_response_append(ctx, buff, strlen(buff));
340     noit_http_response_not_found(ctx, "text/xml");
341   }
342   else {
343     noit_http_response_append(ctx, xml, strlen(xml));
344     noit_http_response_ok(ctx, "text/xml");
345   }
346
347   if(xml) free(xml);
348   noit_http_response_end(ctx);
349   return 0;
350 }
351
352 void
353 stratcon_datastore_iep_check_preload() {
354   ingestor->iep_check_preload();
355 }
356
357 int
358 stratcon_datastore_saveconfig(void *unused) {
359   return ingestor->save_config();
360 }
361
362 void
363 stratcon_datastore_init() {
364   ds_err = noit_log_stream_find("error/datastore");
365   ds_deb = noit_log_stream_find("debug/datastore");
366   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
367   ingest_err = noit_log_stream_find("error/ingest");
368   if(!ds_err) ds_err = noit_error;
369   if(!ingest_err) ingest_err = noit_error;
370   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
371                            &basejpath)) {
372     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
373     exit(-1);
374   }
375
376   assert(noit_http_rest_register_auth(
377     "GET", "/noits/", "^config$", rest_get_noit_config,
378              noit_http_rest_client_cert_auth
379   ) == 0);
380 }
381
Note: See TracBrowser for help on using the browser.