root/src/stratcon_datastore.c

Revision a9077178423e39a94a9b624e44cd4b37899d6fd3, 18.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

refs #122, this stuff works well enough to pull dev back into trunk.
Closing the #122 branch.

flag-day(noitd,stratcond)

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "utils/noit_b64.h"
10 #include "stratcon_datastore.h"
11 #include "stratcon_realtime_http.h"
12 #include "noit_conf.h"
13 #include "noit_check.h"
14 #include <unistd.h>
15 #include <netinet/in.h>
16 #include <sys/un.h>
17 #include <arpa/inet.h>
18 #include <libpq-fe.h>
19 #include <zlib.h>
20
21 static char *check_find = NULL;
22 static const char *check_find_conf = "/stratcon/database/statements/findcheck";
23 static char *check_insert = NULL;
24 static const char *check_insert_conf = "/stratcon/database/statements/check";
25 static char *status_insert = NULL;
26 static const char *status_insert_conf = "/stratcon/database/statements/status";
27 static char *metric_insert_numeric = NULL;
28 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
29 static char *metric_insert_text = NULL;
30 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
31 static char *config_insert = NULL;
32 static const char *config_insert_conf = "/stratcon/database/statements/config";
33
34 static struct datastore_onlooker_list {
35   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, void *);
36   struct datastore_onlooker_list *next;
37 } *onlookers = NULL;
38
39 #define GET_QUERY(a) do { \
40   if(a == NULL) \
41     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
42       goto bad_row; \
43 } while(0)
44
45 #define MAX_PARAMS 8
46 #define POSTGRES_PARTS \
47   PGresult *res; \
48   int rv; \
49   int nparams; \
50   int metric_type; \
51   char *paramValues[MAX_PARAMS]; \
52   int paramLengths[MAX_PARAMS]; \
53   int paramFormats[MAX_PARAMS]; \
54   int paramAllocd[MAX_PARAMS];
55
56 typedef struct ds_single_detail {
57   POSTGRES_PARTS
58 } ds_single_detail;
59 typedef struct ds_job_detail {
60   /* Postgres specific stuff */
61   POSTGRES_PARTS
62
63   char *data;  /* The raw string, NULL means the stream is done -- commit. */
64   struct realtime_tracker *rt;
65
66   int problematic;
67   eventer_t completion_event; /* This event should be registered if non NULL */
68   struct ds_job_detail *next;
69 } ds_job_detail;
70
71 typedef struct {
72   struct sockaddr *remote;
73   eventer_jobq_t  *jobq;
74   /* Postgres specific stuff */
75   PGconn          *dbh;
76   ds_job_detail   *head;
77   ds_job_detail   *tail;
78 } conn_q;
79
80 static void
81 free_params(ds_single_detail *d) {
82   int i;
83   for(i=0; i<d->nparams; i++)
84     if(d->paramAllocd[i] && d->paramValues[i])
85       free(d->paramValues[i]);
86 }
87
88 static void
89 __append(conn_q *q, ds_job_detail *d) {
90   d->next = NULL;
91   if(!q->head) q->head = q->tail = d;
92   else {
93     q->tail->next = d;
94     q->tail = d;
95   }
96 }
97 static void
98 __remove_until(conn_q *q, ds_job_detail *d) {
99   ds_job_detail *next;
100   while(q->head && q->head != d) {
101     next = q->head;
102     q->head = q->head->next;
103     free_params((ds_single_detail *)next);
104     if(next->data) free(next->data);
105     free(next);
106   }
107   if(!q->head) q->tail = NULL;
108 }
109
110 noit_hash_table ds_conns;
111
112 conn_q *
113 __get_conn_q_for_remote(struct sockaddr *remote) {
114   void *vcq;
115   conn_q *cq;
116   char queue_name[128] = "datastore_";
117   static const char __zeros[4] = { 0 };
118   int len = 0;
119   if(remote) {
120     switch(remote->sa_family) {
121       case AF_INET:
122         len = sizeof(struct sockaddr_in);
123         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
124                   queue_name + strlen("datastore_"), len);
125         break;
126       case AF_INET6:
127        len = sizeof(struct sockaddr_in6);
128         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
129                   queue_name + strlen("datastore_"), len);
130        break;
131       case AF_UNIX:
132         len = SUN_LEN(((struct sockaddr_un *)remote));
133         snprintf(queue_name, sizeof(queue_name), "datastore_%s", ((struct sockaddr_un *)remote)->sun_path);
134         break;
135       default: return NULL;
136     }
137   }
138   else {
139     /* This is a dummy connection */
140     remote = (struct sockaddr *)__zeros;
141     snprintf(queue_name, sizeof(queue_name), "datastore_default");
142     len = 4;
143   }
144   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, &vcq))
145     return vcq;
146   cq = calloc(1, sizeof(*cq));
147   cq->remote = malloc(len);
148   memcpy(cq->remote, remote, len);
149   cq->jobq = calloc(1, sizeof(*cq->jobq));
150   eventer_jobq_init(cq->jobq, queue_name);
151   cq->jobq->backq = eventer_default_backq();
152   /* Add one thread */
153   eventer_jobq_increase_concurrency(cq->jobq);
154   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
155   return cq;
156 }
157
158 typedef enum {
159   DS_EXEC_SUCCESS = 0,
160   DS_EXEC_ROW_FAILED = 1,
161   DS_EXEC_TXN_FAILED = 2,
162 } execute_outcome_t;
163
164 static char *
165 __noit__strndup(const char *src, int len) {
166   int slen;
167   char *dst;
168   for(slen = 0; slen < len; slen++)
169     if(src[slen] == '\0') break;
170   dst = malloc(slen + 1);
171   memcpy(dst, src, slen);
172   dst[slen] = '\0';
173   return dst;
174 }
175 #define DECLARE_PARAM_STR(str, len) do { \
176   d->paramValues[d->nparams] = __noit__strndup(str, len); \
177   d->paramLengths[d->nparams] = len; \
178   d->paramFormats[d->nparams] = 0; \
179   d->paramAllocd[d->nparams] = 1; \
180   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
181     free(d->paramValues[d->nparams]); \
182     d->paramValues[d->nparams] = NULL; \
183     d->paramLengths[d->nparams] = 0; \
184     d->paramAllocd[d->nparams] = 0; \
185   } \
186   d->nparams++; \
187 } while(0)
188 #define DECLARE_PARAM_INT(i) do { \
189   int buffer__len; \
190   char buffer__[32]; \
191   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
192   buffer__len = strlen(buffer__); \
193   DECLARE_PARAM_STR(buffer__, buffer__len); \
194 } while(0)
195
196 #define PG_GET_STR_COL(dest, row, name) do { \
197   int colnum = PQfnumber(d->res, name); \
198   dest = NULL; \
199   if (colnum >= 0) \
200     dest = PQgetisnull(d->res, row, colnum) \
201          ? NULL : PQgetvalue(d->res, row, colnum); \
202 } while(0)
203
204 #define PG_EXEC(cmd) do { \
205   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
206                         (const char * const *)d->paramValues, \
207                         d->paramLengths, d->paramFormats, 0); \
208   d->rv = PQresultStatus(d->res); \
209   if(d->rv != PGRES_COMMAND_OK && \
210      d->rv != PGRES_TUPLES_OK) { \
211     noitL(noit_error, "stratcon datasource bad (%d): %s\n", \
212           d->rv, PQresultErrorMessage(d->res)); \
213     PQclear(d->res); \
214     goto bad_row; \
215   } \
216 } while(0)
217
218 execute_outcome_t
219 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) {
220   char *val;
221   int row_count;
222
223   if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid);
224   GET_QUERY(check_find);
225   PG_EXEC(check_find);
226   row_count = PQntuples(d->res);
227   if(row_count != 1) goto bad_row;
228
229   /* Get the check uuid */
230   PG_GET_STR_COL(val, 0, "id");
231   if(!val) goto bad_row;
232   if(uuid_parse(val, d->rt->checkid)) goto bad_row;
233
234   /* Get the remote_address (which noit owns this) */
235   PG_GET_STR_COL(val, 0, "remote_address");
236   if(!val) goto bad_row;
237   d->rt->noit = strdup(val);
238
239   PQclear(d->res);
240   return DS_EXEC_SUCCESS;
241  bad_row:
242   return DS_EXEC_ROW_FAILED;
243 }
244 execute_outcome_t
245 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
246   int type, len;
247   char *final_buff;
248   uLong final_len, actual_final_len;;
249   char *token;
250
251   type = d->data[0];
252
253   /* Parse the log line, but only if we haven't already */
254   if(!d->nparams) {
255     char raddr[128];
256     char *scp, *ecp;
257
258     /* setup our remote address */
259     raddr[0] = '\0';
260     switch(r->sa_family) {
261       case AF_INET:
262         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
263                   raddr, sizeof(raddr));
264         break;
265       case AF_INET6:
266         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
267                   raddr, sizeof(raddr));
268         break;
269       default:
270         noitL(noit_error, "remote address of family %d\n", r->sa_family);
271     }
272  
273     scp = d->data;
274 #define PROCESS_NEXT_FIELD(t,l) do { \
275   if(!*scp) goto bad_row; \
276   ecp = strchr(scp, '\t'); \
277   if(!ecp) goto bad_row; \
278   token = scp; \
279   len = (ecp-scp); \
280   scp = ecp + 1; \
281 } while(0)
282 #define PROCESS_LAST_FIELD(t,l) do { \
283   if(!*scp) ecp = scp; \
284   else { \
285     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
286     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
287   } \
288   t = scp; \
289   l = (ecp-scp); \
290 } while(0)
291
292     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
293     switch(type) {
294       /* See noit_check_log.c for log description */
295       case 'n':
296         DECLARE_PARAM_STR(raddr, strlen(raddr));
297         DECLARE_PARAM_STR("noitd",5); /* node_type */
298         PROCESS_NEXT_FIELD(token,len);
299         DECLARE_PARAM_STR(token,len); /* timestamp */
300
301         /* This is the expected uncompressed len */
302         PROCESS_NEXT_FIELD(token,len);
303         final_len = atoi(token);
304         final_buff = malloc(final_len);
305         if(!final_buff) goto bad_row;
306  
307         /* The last token is b64 endoded and compressed.
308          * we need to decode it, declare it and then free it.
309          */
310         PROCESS_LAST_FIELD(token, len);
311         /* We can in-place decode this */
312         len = noit_b64_decode((char *)token, len,
313                               (unsigned char *)token, len);
314         if(len <= 0) {
315           noitL(noit_error, "noitd config base64 decoding error.\n");
316           free(final_buff);
317           goto bad_row;
318         }
319         actual_final_len = final_len;
320         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
321                               (unsigned char *)token, len)) {
322           noitL(noit_error, "noitd config decompression failure.\n");
323           free(final_buff);
324           goto bad_row;
325         }
326         if(final_len != actual_final_len) {
327           noitL(noit_error, "noitd config decompression error.\n");
328           free(final_buff);
329           goto bad_row;
330         }
331         DECLARE_PARAM_STR(final_buff, final_len);
332         free(final_buff);
333         break;
334       case 'C':
335         DECLARE_PARAM_STR(raddr, strlen(raddr));
336         PROCESS_NEXT_FIELD(token,len);
337         DECLARE_PARAM_STR(token,len); /* timestamp */
338         PROCESS_NEXT_FIELD(token, len);
339         DECLARE_PARAM_STR(token,len); /* uuid */
340         PROCESS_NEXT_FIELD(token, len);
341         DECLARE_PARAM_STR(token,len); /* target */
342         PROCESS_NEXT_FIELD(token, len);
343         DECLARE_PARAM_STR(token,len); /* module */
344         PROCESS_LAST_FIELD(token, len);
345         DECLARE_PARAM_STR(token,len); /* name */
346         break;
347       case 'M':
348         PROCESS_NEXT_FIELD(token,len);
349         DECLARE_PARAM_STR(token,len); /* timestamp */
350         PROCESS_NEXT_FIELD(token, len);
351         DECLARE_PARAM_STR(token,len); /* uuid */
352         PROCESS_NEXT_FIELD(token, len);
353         DECLARE_PARAM_STR(token,len); /* name */
354         PROCESS_NEXT_FIELD(token,len);
355         d->metric_type = *token;
356         PROCESS_LAST_FIELD(token,len);
357         DECLARE_PARAM_STR(token,len); /* value */
358         break;
359       case 'S':
360         PROCESS_NEXT_FIELD(token,len);
361         DECLARE_PARAM_STR(token,len); /* timestamp */
362         PROCESS_NEXT_FIELD(token, len);
363         DECLARE_PARAM_STR(token,len); /* uuid */
364         PROCESS_NEXT_FIELD(token, len);
365         DECLARE_PARAM_STR(token,len); /* state */
366         PROCESS_NEXT_FIELD(token, len);
367         DECLARE_PARAM_STR(token,len); /* availability */
368         PROCESS_NEXT_FIELD(token, len);
369         DECLARE_PARAM_STR(token,len); /* duration */
370         PROCESS_LAST_FIELD(token,len);
371         DECLARE_PARAM_STR(token,len); /* status */
372         break;
373       default:
374         goto bad_row;
375     }
376
377   }
378
379   /* Now execute the query */
380   switch(type) {
381     case 'n':
382       GET_QUERY(config_insert);
383       PG_EXEC(config_insert);
384       PQclear(d->res);
385       break;
386     case 'C':
387       GET_QUERY(check_insert);
388       PG_EXEC(check_insert);
389       PQclear(d->res);
390       break;
391     case 'S':
392       GET_QUERY(status_insert);
393       PG_EXEC(status_insert);
394       PQclear(d->res);
395       break;
396     case 'M':
397       switch(d->metric_type) {
398         case METRIC_INT32:
399         case METRIC_UINT32:
400         case METRIC_INT64:
401         case METRIC_UINT64:
402         case METRIC_DOUBLE:
403           GET_QUERY(metric_insert_numeric);
404           PG_EXEC(metric_insert_numeric);
405           PQclear(d->res);
406           break;
407         case METRIC_STRING:
408           GET_QUERY(metric_insert_text);
409           PG_EXEC(metric_insert_text);
410           PQclear(d->res);
411           break;
412         default:
413           goto bad_row;
414       }
415       break;
416     default:
417       /* should never get here */
418       goto bad_row;
419   }
420   return DS_EXEC_SUCCESS;
421  bad_row:
422   return DS_EXEC_ROW_FAILED;
423 }
424 static int
425 stratcon_database_connect(conn_q *cq) {
426   char dsn[512];
427   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
428   const char *k, *v;
429   int klen;
430   noit_hash_table *t;
431
432   dsn[0] = '\0';
433   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
434   while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
435     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
436     strlcat(dsn, k, sizeof(dsn));
437     strlcat(dsn, "=", sizeof(dsn));
438     strlcat(dsn, v, sizeof(dsn));
439   }
440
441   if(cq->dbh) {
442     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
443     PQreset(cq->dbh);
444     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
445     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
446           dsn, PQerrorMessage(cq->dbh));
447     return -1;
448   }
449
450   cq->dbh = PQconnectdb(dsn);
451   if(!cq->dbh) return -1;
452   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
453   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
454         dsn, PQerrorMessage(cq->dbh));
455   return -1;
456 }
457 static int
458 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
459                                 const char *name) {
460   int rv = -1;
461   PGresult *res;
462   char cmd[128];
463   strlcpy(cmd, p, sizeof(cmd));
464   strlcat(cmd, name, sizeof(cmd));
465   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
466   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
467   PQclear(res);
468   return rv;
469 }
470 static int
471 stratcon_datastore_do(conn_q *cq, const char *cmd) {
472   PGresult *res;
473   int rv = -1;
474   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
475   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
476   PQclear(res);
477   return rv;
478 }
479 #define BUSTED(cq) do { \
480   PQfinish((cq)->dbh); \
481   (cq)->dbh = NULL; \
482   goto full_monty; \
483 } while(0)
484 #define SAVEPOINT(name) do { \
485   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
486   last_sp = current; \
487 } while(0)
488 #define ROLLBACK_TO_SAVEPOINT(name) do { \
489   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
490     BUSTED(cq); \
491   last_sp = NULL; \
492 } while(0)
493 #define RELEASE_SAVEPOINT(name) do { \
494   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
495     BUSTED(cq); \
496   last_sp = NULL; \
497 } while(0)
498 int
499 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
500                                  struct timeval *now) {
501   conn_q *cq = closure;
502   ds_job_detail *current;
503   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
504
505   if(!cq->head) return 0;
506
507   stratcon_database_connect(cq);
508
509   current = cq->head;
510   while(current) {
511     if(current->rt) {
512       stratcon_datastore_find(cq, current);
513       current = current->next;
514     }
515     else if(current->completion_event) {
516       eventer_add(current->completion_event);
517       current = current->next;
518       __remove_until(cq, current);
519     }
520     else current = current->next;
521   }
522   return 0;
523 }
524 int
525 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
526                                   struct timeval *now) {
527   int i;
528   conn_q *cq = closure;
529   ds_job_detail *current, *last_sp;
530   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
531   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
532   if(!cq->head) return 0;
533
534  full_monty:
535   /* Make sure we have a connection */
536   i = 1;
537   while(stratcon_database_connect(cq)) {
538     noitL(noit_error, "Error connecting to database\n");
539     sleep(i);
540     i *= 2;
541     i = MIN(i, 16);
542   }
543
544   current = cq->head;
545   last_sp = NULL;
546   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
547   while(current) {
548     execute_outcome_t rv;
549     if(current->data) {
550       if(!last_sp) SAVEPOINT("batch");
551  
552       if(current->problematic) {
553         noitL(noit_error, "Failed noit line: %s", current->data);
554         RELEASE_SAVEPOINT("batch");
555         current = current->next;
556         continue;
557       }
558       rv = stratcon_datastore_execute(cq, cq->remote, current);
559       switch(rv) {
560         case DS_EXEC_SUCCESS:
561           current = current->next;
562           break;
563         case DS_EXEC_ROW_FAILED:
564           /* rollback to savepoint, mark this record as bad and start again */
565           current->problematic = 1;
566           current = last_sp;
567           ROLLBACK_TO_SAVEPOINT("batch");
568           break;
569         case DS_EXEC_TXN_FAILED:
570           BUSTED(cq);
571       }
572     }
573     if(current->completion_event) {
574       if(last_sp) RELEASE_SAVEPOINT("batch");
575       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
576       eventer_add(current->completion_event);
577       current = current->next;
578       __remove_until(cq, current);
579     }
580   }
581   return 0;
582 }
583 void
584 stratcon_datastore_push(stratcon_datastore_op_t op,
585                         struct sockaddr *remote, void *operand) {
586   conn_q *cq;
587   eventer_t e;
588   ds_job_detail *dsjd;
589   struct datastore_onlooker_list *nnode;
590
591   for(nnode = onlookers; nnode; nnode = nnode->next)
592     nnode->dispatch(op,remote,operand);
593
594   cq = __get_conn_q_for_remote(remote);
595   dsjd = calloc(1, sizeof(*dsjd));
596   switch(op) {
597     case DS_OP_FIND:
598       dsjd->rt = operand;
599       __append(cq, dsjd);
600       break;
601     case DS_OP_INSERT:
602       dsjd->data = operand;
603       __append(cq, dsjd);
604       break;
605     case DS_OP_FIND_COMPLETE:
606     case DS_OP_CHKPT:
607       dsjd->completion_event = operand;
608       __append(cq,dsjd);
609       e = eventer_alloc();
610       e->mask = EVENTER_ASYNCH;
611       if(op == DS_OP_FIND_COMPLETE)
612         e->callback = stratcon_datastore_asynch_lookup;
613       else if(op == DS_OP_CHKPT)
614         e->callback = stratcon_datastore_asynch_execute;
615       e->closure = cq;
616       eventer_add_asynch(cq->jobq, e);
617       break;
618   }
619 }
620
621 int
622 stratcon_datastore_saveconfig(void *unused) {
623   int rv = -1;
624   conn_q _cq = { 0 }, *cq = &_cq;
625   char *buff;
626   ds_single_detail _d = { 0 }, *d = &_d;
627
628   if(stratcon_database_connect(cq) == 0) {
629     char time_as_str[20];
630     size_t len;
631     buff = noit_conf_xml_in_mem(&len);
632     if(!buff) goto bad_row;
633
634     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
635     DECLARE_PARAM_STR("0.0.0.0", 7);
636     DECLARE_PARAM_STR("stratcond", 9);
637     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
638     DECLARE_PARAM_STR(buff, len);
639     free(buff);
640
641     GET_QUERY(config_insert);
642     PG_EXEC(config_insert);
643     PQclear(d->res);
644     rv = 0;
645
646     bad_row:
647       free_params(d);
648   }
649   if(cq->dbh) PQfinish(cq->dbh);
650   return rv;
651 }
652
653 void
654 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
655                                                struct sockaddr *, void *)) {
656   struct datastore_onlooker_list *nnode;
657   nnode = calloc(1, sizeof(*nnode));
658   nnode->dispatch = f;
659   nnode->next = onlookers;
660   while(noit_atomic_casptr((void **)&onlookers, nnode, nnode->next) != (vpsized_int)nnode->next)
661     nnode->next = onlookers;
662 }
Note: See TracBrowser for help on using the browser.