root/src/stratcon_datastore.c

Revision df5d455d9be20ec08b04b04840b6a9e5ac919b34, 6.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 7 years ago)

datasource driver based around postgres libpq, logic is there, just need to do the INSERTs

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "stratcon_datastore.h"
10 #include "noit_conf.h"
11 #include <unistd.h>
12 #include <libpq-fe.h>
13
14 typedef struct ds_job_detail {
15   char *data;  /* The raw string, NULL means the stream is done -- commit. */
16   int problematic;
17   eventer_t completion_event; /* This event should be registered if non NULL */
18   struct ds_job_detail *next;
19 } ds_job_detail;
20
21 typedef struct {
22   struct sockaddr *remote;
23   eventer_jobq_t  *jobq;
24   PGconn          *dbh;
25   ds_job_detail   *head;
26   ds_job_detail   *tail;
27 } conn_q;
28
29 void __append(conn_q *q, ds_job_detail *d) {
30   d->next = NULL;
31   if(!q->head) q->head = q->tail = d;
32   else {
33     q->tail->next = d;
34     q->tail = d;
35   }
36 }
37 void __remove_until(conn_q *q, ds_job_detail *d) {
38   ds_job_detail *next;
39   while(q->head && q->head != d) {
40     next = q->head;
41     q->head = q->head->next;
42     if(next->data) free(next->data);
43     free(next);
44   }
45   if(!q->head) q->tail = NULL;
46 }
47
48 noit_hash_table ds_conns;
49
50 conn_q *
51 __get_conn_q_for_remote(struct sockaddr *remote) {
52   conn_q *cq;
53   if(noit_hash_retrieve(&ds_conns, (const char *)remote, remote->sa_len,
54                         (void **)&cq))
55     return cq;
56   cq = calloc(1, sizeof(*cq));
57   cq->remote = malloc(remote->sa_len);
58   memcpy(cq->remote, remote, remote->sa_len);
59   cq->jobq = calloc(1, sizeof(*cq->jobq));
60   eventer_jobq_init(cq->jobq);
61   cq->jobq->backq = eventer_default_backq();
62   /* Add one thread */
63   eventer_jobq_increase_concurrency(cq->jobq);
64   noit_hash_store(&ds_conns, (const char *)cq->remote, cq->remote->sa_len, cq);
65   return cq;
66 }
67
68 typedef enum {
69   DS_EXEC_SUCCESS = 0,
70   DS_EXEC_ROW_FAILED = 1,
71   DS_EXEC_TXN_FAILED = 2,
72 } execute_outcome_t;
73
74 execute_outcome_t
75 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, const char *data) {
76  
77   return DS_EXEC_ROW_FAILED;
78 }
79 static int
80 stratcon_database_connect(conn_q *cq) {
81   char dsn[512];
82   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
83   const char *k, *v;
84   int klen;
85   noit_hash_table *t;
86
87   if(cq->dbh) {
88     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
89     PQreset(cq->dbh);
90     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
91     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
92           dsn, PQerrorMessage(cq->dbh));
93     return -1;
94   }
95
96   dsn[0] = '\0';
97   t = noit_conf_get_hash(NULL, "/stratcon/noits/dbconfig");
98   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
99     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
100     strlcat(dsn, k, sizeof(dsn));
101     strlcat(dsn, "=", sizeof(dsn));
102     strlcat(dsn, v, sizeof(dsn));
103   }
104   cq->dbh = PQconnectdb(dsn);
105   if(!cq->dbh) return -1;
106   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
107   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
108         dsn, PQerrorMessage(cq->dbh));
109   return -1;
110 }
111 static int
112 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
113                                 const char *name) {
114   int rv;
115   PGresult *res;
116   char cmd[128];
117   strlcpy(cmd, p, sizeof(cmd));
118   strlcat(cmd, name, sizeof(cmd));
119   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
120   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
121   PQclear(res);
122   return rv;
123 }
124 static int
125 stratcon_datastore_do(conn_q *cq, const char *cmd) {
126   PGresult *res;
127   int rv = -1;
128   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
129   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
130   PQclear(res);
131   return rv;
132 }
133 #define BUSTED(cq) do { \
134   PQfinish((cq)->dbh); \
135   (cq)->dbh = NULL; \
136   goto full_monty; \
137 } while(0)
138 #define SAVEPOINT(name) do { \
139   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
140   last_sp = current; \
141 } while(0)
142 #define ROLLBACK_TO_SAVEPOINT(name) do { \
143   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
144     BUSTED(cq); \
145   last_sp = NULL; \
146 } while(0)
147 #define RELEASE_SAVEPOINT(name) do { \
148   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
149     BUSTED(cq); \
150   last_sp = NULL; \
151 } while(0)
152 int
153 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
154                                   struct timeval *now) {
155   conn_q *cq = closure;
156   ds_job_detail *current, *last_sp;
157   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
158
159   if(!cq->head) return 0;
160
161  full_monty:
162   /* Make sure we have a connection */
163   while(stratcon_database_connect(cq)) {
164     noitL(noit_error, "Error connecting to database\n");
165     sleep(1);
166   }
167
168   current = cq->head;
169   last_sp = NULL;
170   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
171   while(current) {
172     execute_outcome_t rv;
173     if(current->data) {
174       if(!last_sp) SAVEPOINT("batch");
175  
176       if(current->problematic) {
177         noitL(noit_error, "Failed noit line: %s", current->data);
178         RELEASE_SAVEPOINT("batch");
179         current = current->next;
180         continue;
181       }
182       rv = stratcon_datastore_execute(cq, cq->remote, current->data);
183       switch(rv) {
184         case DS_EXEC_SUCCESS:
185           current = current->next;
186           break;
187         case DS_EXEC_ROW_FAILED:
188           /* rollback to savepoint, mark this record as bad and start again */
189           current->problematic = 1;
190           current = last_sp;
191           ROLLBACK_TO_SAVEPOINT("batch");
192           break;
193         case DS_EXEC_TXN_FAILED:
194           BUSTED(cq);
195       }
196     }
197     if(current->completion_event) {
198       if(last_sp) RELEASE_SAVEPOINT("batch");
199       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
200       eventer_add(current->completion_event);
201       __remove_until(cq, current->next);
202       current = current->next;
203     }
204   }
205   return 0;
206 }
207 void
208 stratcon_datastore_push(stratcon_datastore_op_t op,
209                         struct sockaddr *remote, void *operand) {
210   conn_q *cq;
211   eventer_t e;
212   ds_job_detail *dsjd;
213
214   cq = __get_conn_q_for_remote(remote);
215   dsjd = calloc(1, sizeof(*dsjd));
216   switch(op) {
217     case DS_OP_INSERT:
218       dsjd->data = operand;
219       __append(cq, dsjd);
220       break;
221     case DS_OP_CHKPT:
222       dsjd->completion_event = operand;
223       __append(cq,dsjd);
224       e = eventer_alloc();
225       e->mask = EVENTER_ASYNCH;
226       e->callback = stratcon_datastore_asynch_execute;
227       e->closure = cq;
228       eventer_add(e);
229       break;
230   }
231 }
Note: See TracBrowser for help on using the browser.