| 8 | | |
|---|
| | 10 | #include "noit_conf.h" |
|---|
| | 11 | #include <unistd.h> |
|---|
| | 12 | #include <libpq-fe.h> |
|---|
| | 13 | |
|---|
| | 14 | typedef struct ds_job_detail { |
|---|
| | 15 | char *data; /* The raw string, NULL means the stream is done -- commit. */ |
|---|
| | 16 | int problematic; |
|---|
| | 17 | eventer_t completion_event; /* This event should be registered if non NULL */ |
|---|
| | 18 | struct ds_job_detail *next; |
|---|
| | 19 | } ds_job_detail; |
|---|
| | 20 | |
|---|
| | 21 | typedef struct { |
|---|
| | 22 | struct sockaddr *remote; |
|---|
| | 23 | eventer_jobq_t *jobq; |
|---|
| | 24 | PGconn *dbh; |
|---|
| | 25 | ds_job_detail *head; |
|---|
| | 26 | ds_job_detail *tail; |
|---|
| | 27 | } conn_q; |
|---|
| | 28 | |
|---|
| | 29 | void __append(conn_q *q, ds_job_detail *d) { |
|---|
| | 30 | d->next = NULL; |
|---|
| | 31 | if(!q->head) q->head = q->tail = d; |
|---|
| | 32 | else { |
|---|
| | 33 | q->tail->next = d; |
|---|
| | 34 | q->tail = d; |
|---|
| | 35 | } |
|---|
| | 36 | } |
|---|
| | 37 | void __remove_until(conn_q *q, ds_job_detail *d) { |
|---|
| | 38 | ds_job_detail *next; |
|---|
| | 39 | while(q->head && q->head != d) { |
|---|
| | 40 | next = q->head; |
|---|
| | 41 | q->head = q->head->next; |
|---|
| | 42 | if(next->data) free(next->data); |
|---|
| | 43 | free(next); |
|---|
| | 44 | } |
|---|
| | 45 | if(!q->head) q->tail = NULL; |
|---|
| | 46 | } |
|---|
| | 47 | |
|---|
| | 48 | noit_hash_table ds_conns; |
|---|
| | 49 | |
|---|
| | 50 | conn_q * |
|---|
| | 51 | __get_conn_q_for_remote(struct sockaddr *remote) { |
|---|
| | 52 | conn_q *cq; |
|---|
| | 53 | if(noit_hash_retrieve(&ds_conns, (const char *)remote, remote->sa_len, |
|---|
| | 54 | (void **)&cq)) |
|---|
| | 55 | return cq; |
|---|
| | 56 | cq = calloc(1, sizeof(*cq)); |
|---|
| | 57 | cq->remote = malloc(remote->sa_len); |
|---|
| | 58 | memcpy(cq->remote, remote, remote->sa_len); |
|---|
| | 59 | cq->jobq = calloc(1, sizeof(*cq->jobq)); |
|---|
| | 60 | eventer_jobq_init(cq->jobq); |
|---|
| | 61 | cq->jobq->backq = eventer_default_backq(); |
|---|
| | 62 | /* Add one thread */ |
|---|
| | 63 | eventer_jobq_increase_concurrency(cq->jobq); |
|---|
| | 64 | noit_hash_store(&ds_conns, (const char *)cq->remote, cq->remote->sa_len, cq); |
|---|
| | 65 | return cq; |
|---|
| | 66 | } |
|---|
| | 67 | |
|---|
| | 68 | typedef enum { |
|---|
| | 69 | DS_EXEC_SUCCESS = 0, |
|---|
| | 70 | DS_EXEC_ROW_FAILED = 1, |
|---|
| | 71 | DS_EXEC_TXN_FAILED = 2, |
|---|
| | 72 | } execute_outcome_t; |
|---|
| | 73 | |
|---|
| | 74 | execute_outcome_t |
|---|
| | 75 | stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, const char *data) { |
|---|
| | 76 | |
|---|
| | 77 | return DS_EXEC_ROW_FAILED; |
|---|
| | 78 | } |
|---|
| | 79 | static int |
|---|
| | 80 | stratcon_database_connect(conn_q *cq) { |
|---|
| | 81 | char dsn[512]; |
|---|
| | 82 | noit_hash_iter iter = NOIT_HASH_ITER_ZERO; |
|---|
| | 83 | const char *k, *v; |
|---|
| | 84 | int klen; |
|---|
| | 85 | noit_hash_table *t; |
|---|
| | 86 | |
|---|
| | 87 | if(cq->dbh) { |
|---|
| | 88 | if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
|---|
| | 89 | PQreset(cq->dbh); |
|---|
| | 90 | if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
|---|
| | 91 | noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", |
|---|
| | 92 | dsn, PQerrorMessage(cq->dbh)); |
|---|
| | 93 | return -1; |
|---|
| | 94 | } |
|---|
| | 95 | |
|---|
| | 96 | dsn[0] = '\0'; |
|---|
| | 97 | t = noit_conf_get_hash(NULL, "/stratcon/noits/dbconfig"); |
|---|
| | 98 | while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) { |
|---|
| | 99 | if(dsn[0]) strlcat(dsn, " ", sizeof(dsn)); |
|---|
| | 100 | strlcat(dsn, k, sizeof(dsn)); |
|---|
| | 101 | strlcat(dsn, "=", sizeof(dsn)); |
|---|
| | 102 | strlcat(dsn, v, sizeof(dsn)); |
|---|
| | 103 | } |
|---|
| | 104 | cq->dbh = PQconnectdb(dsn); |
|---|
| | 105 | if(!cq->dbh) return -1; |
|---|
| | 106 | if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
|---|
| | 107 | noitL(noit_error, "Error connection to database: '%s'\nError: %s\n", |
|---|
| | 108 | dsn, PQerrorMessage(cq->dbh)); |
|---|
| | 109 | return -1; |
|---|
| | 110 | } |
|---|
| | 111 | static int |
|---|
| | 112 | stratcon_datastore_savepoint_op(conn_q *cq, const char *p, |
|---|
| | 113 | const char *name) { |
|---|
| | 114 | int rv; |
|---|
| | 115 | PGresult *res; |
|---|
| | 116 | char cmd[128]; |
|---|
| | 117 | strlcpy(cmd, p, sizeof(cmd)); |
|---|
| | 118 | strlcat(cmd, name, sizeof(cmd)); |
|---|
| | 119 | if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; |
|---|
| | 120 | if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; |
|---|
| | 121 | PQclear(res); |
|---|
| | 122 | return rv; |
|---|
| | 123 | } |
|---|
| | 124 | static int |
|---|
| | 125 | stratcon_datastore_do(conn_q *cq, const char *cmd) { |
|---|
| | 126 | PGresult *res; |
|---|
| | 127 | int rv = -1; |
|---|
| | 128 | if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; |
|---|
| | 129 | if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; |
|---|
| | 130 | PQclear(res); |
|---|
| | 131 | return rv; |
|---|
| | 132 | } |
|---|
| | 133 | #define BUSTED(cq) do { \ |
|---|
| | 134 | PQfinish((cq)->dbh); \ |
|---|
| | 135 | (cq)->dbh = NULL; \ |
|---|
| | 136 | goto full_monty; \ |
|---|
| | 137 | } while(0) |
|---|
| | 138 | #define SAVEPOINT(name) do { \ |
|---|
| | 139 | if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \ |
|---|
| | 140 | last_sp = current; \ |
|---|
| | 141 | } while(0) |
|---|
| | 142 | #define ROLLBACK_TO_SAVEPOINT(name) do { \ |
|---|
| | 143 | if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \ |
|---|
| | 144 | BUSTED(cq); \ |
|---|
| | 145 | last_sp = NULL; \ |
|---|
| | 146 | } while(0) |
|---|
| | 147 | #define RELEASE_SAVEPOINT(name) do { \ |
|---|
| | 148 | if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \ |
|---|
| | 149 | BUSTED(cq); \ |
|---|
| | 150 | last_sp = NULL; \ |
|---|
| | 151 | } while(0) |
|---|
| | 152 | int |
|---|
| | 153 | stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure, |
|---|
| | 154 | struct timeval *now) { |
|---|
| | 155 | conn_q *cq = closure; |
|---|
| | 156 | ds_job_detail *current, *last_sp; |
|---|
| | 157 | if(!(mask & EVENTER_ASYNCH_WORK)) return 0; |
|---|
| | 158 | |
|---|
| | 159 | if(!cq->head) return 0; |
|---|
| | 160 | |
|---|
| | 161 | full_monty: |
|---|
| | 162 | /* Make sure we have a connection */ |
|---|
| | 163 | while(stratcon_database_connect(cq)) { |
|---|
| | 164 | noitL(noit_error, "Error connecting to database\n"); |
|---|
| | 165 | sleep(1); |
|---|
| | 166 | } |
|---|
| | 167 | |
|---|
| | 168 | current = cq->head; |
|---|
| | 169 | last_sp = NULL; |
|---|
| | 170 | if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq); |
|---|
| | 171 | while(current) { |
|---|
| | 172 | execute_outcome_t rv; |
|---|
| | 173 | if(current->data) { |
|---|
| | 174 | if(!last_sp) SAVEPOINT("batch"); |
|---|
| | 175 | |
|---|
| | 176 | if(current->problematic) { |
|---|
| | 177 | noitL(noit_error, "Failed noit line: %s", current->data); |
|---|
| | 178 | RELEASE_SAVEPOINT("batch"); |
|---|
| | 179 | current = current->next; |
|---|
| | 180 | continue; |
|---|
| | 181 | } |
|---|
| | 182 | rv = stratcon_datastore_execute(cq, cq->remote, current->data); |
|---|
| | 183 | switch(rv) { |
|---|
| | 184 | case DS_EXEC_SUCCESS: |
|---|
| | 185 | current = current->next; |
|---|
| | 186 | break; |
|---|
| | 187 | case DS_EXEC_ROW_FAILED: |
|---|
| | 188 | /* rollback to savepoint, mark this record as bad and start again */ |
|---|
| | 189 | current->problematic = 1; |
|---|
| | 190 | current = last_sp; |
|---|
| | 191 | ROLLBACK_TO_SAVEPOINT("batch"); |
|---|
| | 192 | break; |
|---|
| | 193 | case DS_EXEC_TXN_FAILED: |
|---|
| | 194 | BUSTED(cq); |
|---|
| | 195 | } |
|---|
| | 196 | } |
|---|
| | 197 | if(current->completion_event) { |
|---|
| | 198 | if(last_sp) RELEASE_SAVEPOINT("batch"); |
|---|
| | 199 | if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq); |
|---|
| | 200 | eventer_add(current->completion_event); |
|---|
| | 201 | __remove_until(cq, current->next); |
|---|
| | 202 | current = current->next; |
|---|
| | 203 | } |
|---|
| | 204 | } |
|---|
| | 205 | return 0; |
|---|
| | 206 | } |
|---|