Show
Ignore:
Timestamp:
03/03/08 04:52:55 (6 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1204519975 +0000
git-parent:

[ae5b0cb703e7808517d5e1aff0f2da999f35b1e9]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1204519975 +0000
Message:

datasource driver based around postgres libpq, logic is there, just need to do the INSERTs

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/stratcon.conf

    ra7304b5 rdf5d455  
    11<stratcon> 
    22  <eventer implementation="kqueue" /> 
     3 
     4  <logs> 
     5    <console_output> 
     6      <outlet name="stderr"/> 
     7      <log name="error"/> 
     8      <log name="debug" disabled="true"/> 
     9    </console_output> 
     10  </logs> 
    311 
    412  <noits> 
     
    1220      <reconnect_maximum_interval>900000</reconnect_maximum_interval> 
    1321    </config> 
     22    <dbconfig> 
     23      <host>localhost</host> 
     24      <dbname>reconnoiter</dbname> 
     25      <user>reconnoiter</user> 
     26      <password>reconnoiter</password> 
     27    </dbconfig> 
    1428    <sslconfig> 
    1529      <key_file>test-strat.key</key_file> 
  • src/stratcon_datastore.c

    ra7304b5 rdf5d455  
    55 
    66#include "noit_defines.h" 
     7#include "eventer/eventer.h" 
     8#include "utils/noit_log.h" 
    79#include "stratcon_datastore.h" 
    8  
     10#include "noit_conf.h" 
     11#include <unistd.h> 
     12#include <libpq-fe.h> 
     13 
     14typedef struct ds_job_detail { 
     15  char *data;  /* The raw string, NULL means the stream is done -- commit. */ 
     16  int problematic; 
     17  eventer_t completion_event; /* This event should be registered if non NULL */ 
     18  struct ds_job_detail *next; 
     19} ds_job_detail; 
     20 
     21typedef struct { 
     22  struct sockaddr *remote; 
     23  eventer_jobq_t  *jobq; 
     24  PGconn          *dbh; 
     25  ds_job_detail   *head; 
     26  ds_job_detail   *tail; 
     27} conn_q; 
     28 
     29void __append(conn_q *q, ds_job_detail *d) { 
     30  d->next = NULL; 
     31  if(!q->head) q->head = q->tail = d; 
     32  else { 
     33    q->tail->next = d; 
     34    q->tail = d; 
     35  } 
     36
     37void __remove_until(conn_q *q, ds_job_detail *d) { 
     38  ds_job_detail *next; 
     39  while(q->head && q->head != d) { 
     40    next = q->head; 
     41    q->head = q->head->next; 
     42    if(next->data) free(next->data); 
     43    free(next); 
     44  } 
     45  if(!q->head) q->tail = NULL; 
     46
     47 
     48noit_hash_table ds_conns; 
     49 
     50conn_q * 
     51__get_conn_q_for_remote(struct sockaddr *remote) { 
     52  conn_q *cq; 
     53  if(noit_hash_retrieve(&ds_conns, (const char *)remote, remote->sa_len, 
     54                        (void **)&cq)) 
     55    return cq; 
     56  cq = calloc(1, sizeof(*cq)); 
     57  cq->remote = malloc(remote->sa_len); 
     58  memcpy(cq->remote, remote, remote->sa_len); 
     59  cq->jobq = calloc(1, sizeof(*cq->jobq)); 
     60  eventer_jobq_init(cq->jobq); 
     61  cq->jobq->backq = eventer_default_backq(); 
     62  /* Add one thread */ 
     63  eventer_jobq_increase_concurrency(cq->jobq); 
     64  noit_hash_store(&ds_conns, (const char *)cq->remote, cq->remote->sa_len, cq); 
     65  return cq; 
     66
     67 
     68typedef enum { 
     69  DS_EXEC_SUCCESS = 0, 
     70  DS_EXEC_ROW_FAILED = 1, 
     71  DS_EXEC_TXN_FAILED = 2, 
     72} execute_outcome_t; 
     73 
     74execute_outcome_t 
     75stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, const char *data) { 
     76   
     77  return DS_EXEC_ROW_FAILED; 
     78
     79static int 
     80stratcon_database_connect(conn_q *cq) { 
     81  char dsn[512]; 
     82  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
     83  const char *k, *v; 
     84  int klen; 
     85  noit_hash_table *t; 
     86 
     87  if(cq->dbh) { 
     88    if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
     89    PQreset(cq->dbh); 
     90    if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
     91    noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", 
     92          dsn, PQerrorMessage(cq->dbh)); 
     93    return -1; 
     94  } 
     95 
     96  dsn[0] = '\0'; 
     97  t = noit_conf_get_hash(NULL, "/stratcon/noits/dbconfig"); 
     98  while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) { 
     99    if(dsn[0]) strlcat(dsn, " ", sizeof(dsn)); 
     100    strlcat(dsn, k, sizeof(dsn)); 
     101    strlcat(dsn, "=", sizeof(dsn)); 
     102    strlcat(dsn, v, sizeof(dsn)); 
     103  } 
     104  cq->dbh = PQconnectdb(dsn); 
     105  if(!cq->dbh) return -1; 
     106  if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
     107  noitL(noit_error, "Error connection to database: '%s'\nError: %s\n", 
     108        dsn, PQerrorMessage(cq->dbh)); 
     109  return -1; 
     110
     111static int 
     112stratcon_datastore_savepoint_op(conn_q *cq, const char *p, 
     113                                const char *name) { 
     114  int rv; 
     115  PGresult *res; 
     116  char cmd[128]; 
     117  strlcpy(cmd, p, sizeof(cmd)); 
     118  strlcat(cmd, name, sizeof(cmd)); 
     119  if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; 
     120  if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; 
     121  PQclear(res); 
     122  return rv; 
     123
     124static int 
     125stratcon_datastore_do(conn_q *cq, const char *cmd) { 
     126  PGresult *res; 
     127  int rv = -1; 
     128  if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; 
     129  if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; 
     130  PQclear(res); 
     131  return rv; 
     132
     133#define BUSTED(cq) do { \ 
     134  PQfinish((cq)->dbh); \ 
     135  (cq)->dbh = NULL; \ 
     136  goto full_monty; \ 
     137} while(0) 
     138#define SAVEPOINT(name) do { \ 
     139  if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \ 
     140  last_sp = current; \ 
     141} while(0) 
     142#define ROLLBACK_TO_SAVEPOINT(name) do { \ 
     143  if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \ 
     144    BUSTED(cq); \ 
     145  last_sp = NULL; \ 
     146} while(0) 
     147#define RELEASE_SAVEPOINT(name) do { \ 
     148  if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \ 
     149    BUSTED(cq); \ 
     150  last_sp = NULL; \ 
     151} while(0) 
     152int 
     153stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure, 
     154                                  struct timeval *now) { 
     155  conn_q *cq = closure; 
     156  ds_job_detail *current, *last_sp; 
     157  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     158 
     159  if(!cq->head) return 0;  
     160 
     161 full_monty: 
     162  /* Make sure we have a connection */ 
     163  while(stratcon_database_connect(cq)) { 
     164    noitL(noit_error, "Error connecting to database\n"); 
     165    sleep(1); 
     166  } 
     167 
     168  current = cq->head;  
     169  last_sp = NULL; 
     170  if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq); 
     171  while(current) { 
     172    execute_outcome_t rv; 
     173    if(current->data) { 
     174      if(!last_sp) SAVEPOINT("batch"); 
     175  
     176      if(current->problematic) { 
     177        noitL(noit_error, "Failed noit line: %s", current->data); 
     178        RELEASE_SAVEPOINT("batch"); 
     179        current = current->next; 
     180        continue; 
     181      }  
     182      rv = stratcon_datastore_execute(cq, cq->remote, current->data); 
     183      switch(rv) { 
     184        case DS_EXEC_SUCCESS: 
     185          current = current->next; 
     186          break; 
     187        case DS_EXEC_ROW_FAILED: 
     188          /* rollback to savepoint, mark this record as bad and start again */ 
     189          current->problematic = 1; 
     190          current = last_sp; 
     191          ROLLBACK_TO_SAVEPOINT("batch"); 
     192          break; 
     193        case DS_EXEC_TXN_FAILED: 
     194          BUSTED(cq); 
     195      } 
     196    } 
     197    if(current->completion_event) { 
     198      if(last_sp) RELEASE_SAVEPOINT("batch"); 
     199      if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq); 
     200      eventer_add(current->completion_event); 
     201      __remove_until(cq, current->next); 
     202      current = current->next; 
     203    } 
     204  } 
     205  return 0; 
     206
    9207void 
    10208stratcon_datastore_push(stratcon_datastore_op_t op, 
    11209                        struct sockaddr *remote, void *operand) { 
    12 
     210  conn_q *cq; 
     211  eventer_t e; 
     212  ds_job_detail *dsjd; 
     213 
     214  cq = __get_conn_q_for_remote(remote); 
     215  dsjd = calloc(1, sizeof(*dsjd)); 
     216  switch(op) { 
     217    case DS_OP_INSERT: 
     218      dsjd->data = operand; 
     219      __append(cq, dsjd); 
     220      break; 
     221    case DS_OP_CHKPT: 
     222      dsjd->completion_event = operand; 
     223      __append(cq,dsjd); 
     224      e = eventer_alloc(); 
     225      e->mask = EVENTER_ASYNCH; 
     226      e->callback = stratcon_datastore_asynch_execute; 
     227      e->closure = cq; 
     228      eventer_add(e); 
     229      break; 
     230  } 
     231