root/src/modules/handoff_ingestor.c

Revision ecfa20bd1a6282ffdbffe8ed2cea8f775d7a53f9, 9.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

this adds a handoff ingestor that makes it possible to do the ingestion from an external process that connects to stratcond for notifications of new journals

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_b64.h"
38 #include "utils/noit_str.h"
39 #include "utils/noit_mkdir.h"
40 #include "utils/noit_getip.h"
41 #include "stratcon_datastore.h"
42 #include "stratcon_realtime_http.h"
43 #include "stratcon_iep.h"
44 #include "noit_conf.h"
45 #include "noit_check.h"
46 #include "noit_rest.h"
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <netinet/in.h>
50 #include <sys/un.h>
51 #include <dirent.h>
52 #include <arpa/inet.h>
53 #include <sys/mman.h>
54 #include <assert.h>
55 #include <errno.h>
56 #include "handoff_ingestor.xmlh"
57
58 static noit_http_session_ctx *the_one_and_only = NULL;
59 static noit_log_stream_t ds_err = NULL;
60 static noit_log_stream_t ds_deb = NULL;
61 static noit_log_stream_t ds_pool_deb = NULL;
62 static noit_log_stream_t ingest_err = NULL;
63
64 static int storage_node_quick_lookup(const char *uuid_str,
65                                      const char *remote_cn,
66                                      int *sid_out, int *storagenode_id_out,
67                                      const char **remote_cn_out,
68                                      const char **fqdn_out,
69                                      const char **dsn_out);
70
71 static char *basejpath = NULL;
72 static noit_hash_table uuid_map = NOIT_HASH_EMPTY;
73
74 static void
75 stratcon_ingest_iep_check_preload() {
76   noitL(noit_debug, "iep_preload is a noop in handoff mode\n");
77 }
78 static void
79 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
80                                 eventer_t completion) {
81   struct realtime_tracker *node;
82
83   for(node = rt; node; node = node->next) {
84     char uuid_str[UUID_STR_LEN+1];
85     const char *fqdn, *dsn, *remote_cn;
86     char remote_ip[32];
87     int storagenode_id;
88
89     uuid_unparse_lower(node->checkid, uuid_str);
90     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
91                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
92       continue;
93
94     noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
95           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
96
97     if(stratcon_find_noit_ip_by_cn(remote_cn,
98                                    remote_ip, sizeof(remote_ip)) == 0) {
99       node->noit = strdup(remote_ip);
100       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
101       continue;
102     }
103   }
104   eventer_add(completion);
105 }
106
107 static int
108 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
109                           int *sid_out, int *storagenode_id_out,
110                           const char **remote_cn_out,
111                           const char **fqdn_out, const char **dsn_out) {
112   /* only called from the main thread -- no safety issues */
113   void *vstr;
114   const char *actual_remote_cn = NULL;
115   if(remote_cn) actual_remote_cn = remote_cn;
116   uuid_t id;
117   uuid_parse(uuid_str, id);
118   if(noit_hash_retrieve(&uuid_map, id, UUID_SIZE, &vstr)) {
119     char *str = (char *)vstr;
120     if(remote_cn && strcmp(str, remote_cn)) {
121       /* replace with new remote */
122       void *key = malloc(UUID_SIZE);
123       memcpy(key, id, UUID_SIZE);
124       actual_remote_cn = strdup(remote_cn);
125       noit_hash_replace(&uuid_map, key, UUID_SIZE, (void *)actual_remote_cn,
126                         free, free);
127     }
128   }
129   else if(remote_cn) {
130     void *key = malloc(UUID_SIZE);
131     memcpy(key, id, UUID_SIZE);
132     noit_hash_store(&uuid_map, key, UUID_SIZE, strdup(remote_cn));
133   }
134   if(!actual_remote_cn) actual_remote_cn = "[[null]]";
135
136   if(sid_out) *sid_out = 0;
137   if(storagenode_id_out) *storagenode_id_out = 0;
138   if(remote_cn_out) *remote_cn_out = actual_remote_cn;
139   if(fqdn_out) *fqdn_out = "";
140   if(dsn_out) *dsn_out = "";
141   return 0;
142 }
143
144 static int
145 stratcon_ingest_saveconfig() {
146   int rv = -1;
147   char *buff;
148   char ipv4_str[32], time_as_str[20];
149   struct in_addr r, l;
150   size_t len;
151
152   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
153   memset(&l, 0, sizeof(l));
154   noit_getip_ipv4(r, &l);
155   /* Ignore the error.. what are we going to do anyway */
156   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
157     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
158
159   buff = noit_conf_xml_in_mem(&len);
160   if(!buff) goto bail;
161
162   snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
163 #if 0
164   /* writev this somwhere */
165   (ipv4_str, strlen(ipv4_str));
166   ("", 0);
167   ("stratcond", 9);
168   (time_as_str, strlen(time_as_str));
169   (buff, len);
170 #endif
171   rv = 0;
172   free(buff);
173 bail:
174   return rv;
175 }
176
177 static void
178 stratcon_ingest_launch_file_ingestion(const char *path,
179                                       const char *remote_str,
180                                       const char *remote_cn,
181                                       const char *id_str) {
182   char msg[PATH_MAX + 7]; /*file:\r\n*/
183   noitL(noit_error, " handoff -> %s\n", path);
184   if(the_one_and_only) {
185     noit_http_session_ctx *ctx = the_one_and_only;
186     snprintf(msg, sizeof(msg), "file:%s\r\n", path);
187     if(noit_http_response_append(ctx,msg,strlen(msg)) == noit_false ||
188        noit_http_response_flush(ctx, noit_false) == noit_false) {
189       noitL(noit_error, "handoff endpoint disconnected\n");
190       the_one_and_only = NULL;
191     }
192   }
193 }
194
195 static int
196 handoff_request_dispatcher(noit_http_session_ctx *ctx) {
197   char *hello = "message:hello\r\n";
198   if(the_one_and_only) {
199     hello = "message:already connected\r\n";
200     noit_http_response_server_error(ctx, "text/plain");
201     noit_http_response_append(ctx, hello, strlen(hello));
202     noit_http_response_end(ctx);
203     return 0;
204   }
205   the_one_and_only = ctx;
206   noit_http_response_status_set(ctx, 200, "OK");
207   noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
208   noit_http_response_header_set(ctx, "Content-Type", "text/plain");
209   noit_http_response_append(ctx, hello, strlen(hello));
210   noit_http_response_flush(ctx, noit_false);
211   return EVENTER_EXCEPTION;
212 }
213
214 static int
215 handoff_http_handler(eventer_t e, int mask, void *closure,
216                      struct timeval *now) {
217   int done = 0, rv;
218   acceptor_closure_t *ac = closure;
219   noit_http_session_ctx *http_ctx = ac->service_ctx;
220   rv = noit_http_session_drive(e, mask, http_ctx, now, &done);
221   if(done) {
222     the_one_and_only = NULL;
223     acceptor_closure_free(ac);
224   }
225   return rv;
226 }
227
228 static int
229 handoff_stream(noit_http_rest_closure_t *restc, int npats, char **pats) {
230   noit_http_session_ctx *ctx = restc->http_ctx;
231   noit_http_connection *conn = noit_http_session_connection(ctx);
232   eventer_t e;
233   acceptor_closure_t *ac = restc->ac;
234
235   if(ac->service_ctx_free)
236     ac->service_ctx_free(ac->service_ctx);
237   ac->service_ctx = ctx;
238   ac->service_ctx_free = noit_http_ctx_acceptor_free;
239
240   e = noit_http_connection_event(conn);
241   e->callback = handoff_http_handler;
242   noit_http_session_set_dispatcher(ctx, handoff_request_dispatcher, NULL);
243   return handoff_request_dispatcher(ctx);
244 }
245
246 static ingestor_api_t handoff_ingestor_api = {
247   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
248   .iep_check_preload = stratcon_ingest_iep_check_preload,
249   .storage_node_lookup = storage_node_quick_lookup,
250   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
251   .get_noit_config = NULL,
252   .save_config = stratcon_ingest_saveconfig
253 };
254
255 static int handoff_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) {
256   return 0;
257 }
258 static int handoff_ingestor_onload(noit_image_t *self) {
259   return 0;
260 }
261 static int handoff_ingestor_init(noit_module_generic_t *self) {
262   ds_err = noit_log_stream_find("error/datastore");
263   ds_deb = noit_log_stream_find("debug/datastore");
264   ingest_err = noit_log_stream_find("error/ingest");
265   if(!ds_err) ds_err = noit_error;
266   if(!ingest_err) ingest_err = noit_error;
267   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
268                            &basejpath)) {
269     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
270     exit(-1);
271   }
272   noitL(noit_error, "registering /handoff/journals REST endpoint\n");
273   assert(noit_http_rest_register_auth(
274     "GET", "/handoff/", "^journals$", handoff_stream,
275     noit_http_rest_client_cert_auth
276   ) == 0);
277   return stratcon_datastore_set_ingestor(&handoff_ingestor_api);
278 }
279
280 noit_module_generic_t handoff_ingestor = {
281   {
282     NOIT_GENERIC_MAGIC,
283     NOIT_GENERIC_ABI_VERSION,
284     "handoff_ingestor",
285     "data ingestion that just hands off to another process",
286     handoff_ingestor_xml_description,
287     handoff_ingestor_onload,
288   },
289   handoff_ingestor_config,
290   handoff_ingestor_init
291 };
Note: See TracBrowser for help on using the browser.