Changeset 73

Show
Ignore:
Timestamp:
12/19/09 22:22:01 (4 years ago)
Author:
jesus
Message:

exchange declare, lots of fix up and autonomous publishing + comments on functions.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/ext/pg_amqp/librabbitmq/amqp.h

    r72 r73  
    310310            amqp_table_t arguments); 
    311311 
     312extern struct amqp_basic_return_t_ *amqp_basic_return(amqp_connection_state_t state, 
     313            amqp_channel_t channel, 
     314            amqp_table_t arguments); 
     315 
    312316/* 
    313317 * Can be used to see if there is data still in the buffer, if so 
  • trunk/ext/pg_amqp/librabbitmq/amqp_api.c

    r72 r73  
    311311  return RPC_REPLY(amqp_tx_rollback_ok_t); 
    312312} 
     313 
     314amqp_basic_return_t *amqp_basic_return(amqp_connection_state_t state, 
     315                                       amqp_channel_t channel, 
     316                                       amqp_table_t arguments) 
     317{ 
     318  amqp_rpc_reply_t *amqp_rpc_reply; 
     319  amqp_rpc_reply = amqp_get_rpc_reply(); 
     320  *amqp_rpc_reply = 
     321    AMQP_SIMPLE_RPC(state, channel, BASIC, RETURN, RETURN, 
     322                    amqp_basic_return_t); 
     323  return RPC_REPLY(amqp_basic_return_t); 
     324} 
  • trunk/ext/pg_amqp/pg_amqp.c

    r72 r73  
    5252#include "librabbitmq/amqp_framing.h" 
    5353 
     54#define set_bytes_from_text(var,col) do { \ 
     55  if(!PG_ARGISNULL(col)) { \ 
     56    text *txt = PG_GETARG_TEXT_PP(col); \ 
     57    var.bytes = VARDATA_ANY(txt); \ 
     58    var.len = VARSIZE_ANY_EXHDR(txt); \ 
     59  } \ 
     60} while(0) 
     61 
    5462#ifdef PG_MODULE_MAGIC 
    5563PG_MODULE_MAGIC; 
    5664#endif 
    5765void _PG_init(void); 
     66Datum pg_amqp_exchange_declare(PG_FUNCTION_ARGS); 
    5867Datum pg_amqp_publish(PG_FUNCTION_ARGS); 
     68Datum pg_amqp_autonomous_publish(PG_FUNCTION_ARGS); 
    5969Datum pg_amqp_disconnect(PG_FUNCTION_ARGS); 
    6070 
     
    6474  int sockfd; 
    6575  int uncommitted; 
     76  int inerror; 
    6677  struct brokerstate *next; 
    6778}; 
     
    7283local_amqp_disconnect_bs(struct brokerstate *bs) { 
    7384  if(bs && bs->conn) { 
     85    int errorstate = bs->inerror; 
    7486    amqp_connection_close(bs->conn, AMQP_REPLY_SUCCESS); 
    7587    amqp_destroy_connection(bs->conn); 
    7688    memset(bs, 0, sizeof(*bs)); 
     89    bs->inerror = errorstate; 
    7790  } 
    7891} 
     
    8396    case XACT_EVENT_COMMIT: 
    8497      for(bs = HEAD_BS; bs; bs = bs->next) { 
     98        if(bs->inerror) local_amqp_disconnect_bs(bs); 
     99        bs->inerror = 0; 
    85100        if(!bs->uncommitted) continue; 
    86         amqp_tx_commit(bs->conn, 1, AMQP_EMPTY_TABLE); 
     101        if(bs->conn) amqp_tx_commit(bs->conn, 2, AMQP_EMPTY_TABLE); 
    87102        reply = amqp_get_rpc_reply(); 
    88103        if(reply->reply_type != AMQP_RESPONSE_NORMAL) { 
     
    95110    case XACT_EVENT_ABORT: 
    96111      for(bs = HEAD_BS; bs; bs = bs->next) { 
     112        if(bs->inerror) local_amqp_disconnect_bs(bs); 
     113        bs->inerror = 0; 
    97114        if(!bs->uncommitted) continue; 
    98         amqp_tx_rollback(bs->conn, 1, AMQP_EMPTY_TABLE); 
     115        if(bs->conn) amqp_tx_rollback(bs->conn, 2, AMQP_EMPTY_TABLE); 
    99116        reply = amqp_get_rpc_reply(); 
    100117        if(reply->reply_type != AMQP_RESPONSE_NORMAL) { 
     
    174191        goto busted; 
    175192      } 
    176       amqp_tx_select(bs->conn, 1, AMQP_EMPTY_TABLE); 
     193      amqp_channel_open(bs->conn, 2); 
     194      reply = amqp_get_rpc_reply(); 
     195      if(reply->reply_type != AMQP_RESPONSE_NORMAL) { 
     196        elog(WARNING, "amqp channel open failed on broker %d", broker_id); 
     197        goto busted; 
     198      } 
     199      amqp_tx_select(bs->conn, 2, AMQP_EMPTY_TABLE); 
    177200      reply = amqp_get_rpc_reply(); 
    178201      if(reply->reply_type != AMQP_RESPONSE_NORMAL) { 
     
    199222} 
    200223 
    201 PG_FUNCTION_INFO_V1(pg_amqp_publish); 
    202224Datum 
    203 pg_amqp_publish(PG_FUNCTION_ARGS) { 
     225pg_amqp_exchange_declare(PG_FUNCTION_ARGS) { 
    204226  struct brokerstate *bs; 
    205227  if(!PG_ARGISNULL(0)) { 
     
    208230    bs = local_amqp_get_bs(broker_id); 
    209231    if(bs && bs->conn) { 
     232      amqp_rpc_reply_t *reply; 
     233      amqp_bytes_t exchange_b; 
     234      amqp_bytes_t exchange_type_b; 
     235      amqp_boolean_t passive = 0; 
     236      amqp_boolean_t durable = 0; 
     237      amqp_boolean_t auto_delete = 0; 
     238 
     239      set_bytes_from_text(exchange_b,1); 
     240      set_bytes_from_text(exchange_type_b,2); 
     241      passive = PG_GETARG_BOOL(3); 
     242      durable = PG_GETARG_BOOL(4); 
     243      auto_delete = PG_GETARG_BOOL(5); 
     244      amqp_exchange_declare(bs->conn, 1, 
     245                            exchange_b, exchange_type_b, 
     246                            passive, durable, auto_delete, AMQP_EMPTY_TABLE); 
     247      reply = amqp_get_rpc_reply(); 
     248      if(reply->reply_type == AMQP_RESPONSE_NORMAL) 
     249        PG_RETURN_BOOL(0 == 0); 
     250      bs->inerror = 1; 
     251    } 
     252  } 
     253  PG_RETURN_BOOL(0 != 0); 
     254} 
     255static Datum 
     256pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { 
     257  struct brokerstate *bs; 
     258  if(!PG_ARGISNULL(0)) { 
     259    int broker_id; 
     260    broker_id = PG_GETARG_INT32(0); 
     261    bs = local_amqp_get_bs(broker_id); 
     262    if(bs && bs->conn && (channel == 1 || !bs->inerror)) { 
    210263      int rv; 
    211264      amqp_rpc_reply_t *reply; 
     
    216269      amqp_bytes_t body_b = amqp_cstring_bytes(""); 
    217270 
    218 #define set_bytes_from_test(var,col) do { \ 
    219   if(!PG_ARGISNULL(col)) { \ 
    220     text *txt = PG_GETARG_TEXT_PP(col); \ 
    221     var.bytes = VARDATA_ANY(txt); \ 
    222     var.len = VARSIZE_ANY_EXHDR(txt); \ 
    223   } \ 
    224 } while(0) 
    225       set_bytes_from_test(exchange_b,1); 
    226       set_bytes_from_test(routing_key_b,2); 
    227       set_bytes_from_test(body_b,3); 
    228       rv = amqp_basic_publish(bs->conn, 1, exchange_b, routing_key_b, 
     271      set_bytes_from_text(exchange_b,1); 
     272      set_bytes_from_text(routing_key_b,2); 
     273      set_bytes_from_text(body_b,3); 
     274      rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b, 
    229275                              mandatory, immediate, NULL, body_b); 
    230276      reply = amqp_get_rpc_reply(); 
    231       if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) PG_RETURN_BOOL(0 != 0); 
    232       bs->uncommitted++; 
     277      if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) { 
     278        bs->inerror = 1; 
     279        PG_RETURN_BOOL(0 != 0); 
     280      } 
     281      /* channel two is transactional */ 
     282      if(channel == 2) bs->uncommitted++; 
    233283      PG_RETURN_BOOL(rv == 0); 
    234284    } 
    235285  } 
    236286  PG_RETURN_BOOL(0 != 0); 
     287} 
     288 
     289PG_FUNCTION_INFO_V1(pg_amqp_publish); 
     290Datum 
     291pg_amqp_publish(PG_FUNCTION_ARGS) { 
     292  return pg_amqp_publish_opt(fcinfo, 2); 
     293} 
     294 
     295PG_FUNCTION_INFO_V1(pg_amqp_autonomous_publish); 
     296Datum 
     297pg_amqp_autonomous_publish(PG_FUNCTION_ARGS) { 
     298  return pg_amqp_publish_opt(fcinfo, 1); 
    237299} 
    238300 
  • trunk/ext/pg_amqp/pg_amqp.sql

    r72 r73  
    11BEGIN; 
    22create schema amqp; 
     3 
     4create function amqp.exchange_declare(integer, varchar, varchar, boolean, boolean, boolean) 
     5returns boolean as 'pg_amqp.so', 'pg_amqp_exchange_declare' 
     6language C immutable; 
     7 
     8comment on function amqp.exchange_declare(integer, varchar, varchar, boolean, boolean, boolean) is 
     9'Declares a exchange (broker_id, exchange_name, exchange_type, passive, durable, auto_delete) 
     10auto_delete should be set to false as unexpected errors can cause disconnect/reconnect which 
     11would trigger the auto deletion of the exchange.'; 
    312 
    413create function amqp.publish(integer, varchar, varchar, varchar) 
     
    615language C immutable; 
    716 
     17comment on function amqp.publish(integer, varchar, varchar, varchar) is 
     18'Publishes a message (broker_id, exchange, routing_key, message).  The message will only 
     19be published if the containing PostgreSQL transaction successfully commits.  Under certain 
     20circumstances, the AMQP commit might fail.  In this case, a WARNING is emitted. 
     21 
     22Publish returns a boolean indicating if the publish command was successful.  Note that as 
     23AMQP publish is asynchronous, you may find out later it was unsuccessful.'; 
     24 
     25create function amqp.autonomous_publish(integer, varchar, varchar, varchar) 
     26returns boolean as 'pg_amqp.so', 'pg_amqp_autonomous_publish' 
     27language C immutable; 
     28 
     29comment on function amqp.autonomous_publish(integer, varchar, varchar, varchar) is 
     30'Works as amqp.publish does, but the message is published immediately irrespective of the 
     31current transaction state.  PostgreSQL commit and rollback at a later point will have no 
     32effect on this message being sent to AMQP.'; 
     33 
    834create function amqp.disconnect(integer) 
    935returns void as 'pg_amqp.so', 'pg_amqp_disconnect' 
    1036language C immutable strict; 
     37 
     38comment on function amqp.disconnect(integer) is 
     39'Explicitly disconnect the specified (broker_id) if it is current connected. Broker 
     40connections, once established, live until the PostgreSQL backend terminated.  This 
     41allows for more precise control over that. 
     42select amqp.disconnect(broker_id) from amqp.broker 
     43will disconnect any brokers that may be connected.'; 
    1144 
    1245create table amqp.broker (