root/trunk/contrib/pg_amqp/pg_amqp.c

Revision 73, 10.0 kB (checked in by jesus, 4 years ago)

exchange declare, lots of fix up and autonomous publishing + comments on functions.

Line 
1 /*
2  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  * Author: Theo Schlossnagle
33  *
34  */
35
36 #include <time.h>
37 #include <sys/time.h>
38
39 #include "postgres.h"
40 #include "funcapi.h"
41 #include "fmgr.h"
42 #include "miscadmin.h"
43 #include "pgstat.h"
44 #include "executor/spi.h"
45 #include "storage/lwlock.h"
46 #include "storage/shmem.h"
47 #include "storage/ipc.h"
48 #include "access/xact.h"
49 #include "utils/memutils.h"
50 #include "utils/builtins.h"
51 #include "librabbitmq/amqp.h"
52 #include "librabbitmq/amqp_framing.h"
53
54 #define set_bytes_from_text(var,col) do { \
55   if(!PG_ARGISNULL(col)) { \
56     text *txt = PG_GETARG_TEXT_PP(col); \
57     var.bytes = VARDATA_ANY(txt); \
58     var.len = VARSIZE_ANY_EXHDR(txt); \
59   } \
60 } while(0)
61
62 #ifdef PG_MODULE_MAGIC
63 PG_MODULE_MAGIC;
64 #endif
65 void _PG_init(void);
66 Datum pg_amqp_exchange_declare(PG_FUNCTION_ARGS);
67 Datum pg_amqp_publish(PG_FUNCTION_ARGS);
68 Datum pg_amqp_autonomous_publish(PG_FUNCTION_ARGS);
69 Datum pg_amqp_disconnect(PG_FUNCTION_ARGS);
70
71 struct brokerstate {
72   int broker_id;
73   amqp_connection_state_t conn;
74   int sockfd;
75   int uncommitted;
76   int inerror;
77   struct brokerstate *next;
78 };
79
80 static struct brokerstate *HEAD_BS = NULL;
81
82 static void
83 local_amqp_disconnect_bs(struct brokerstate *bs) {
84   if(bs && bs->conn) {
85     int errorstate = bs->inerror;
86     amqp_connection_close(bs->conn, AMQP_REPLY_SUCCESS);
87     amqp_destroy_connection(bs->conn);
88     memset(bs, 0, sizeof(*bs));
89     bs->inerror = errorstate;
90   }
91 }
92 static void amqp_local_phase2(XactEvent event, void *arg) {
93   amqp_rpc_reply_t *reply;
94   struct brokerstate *bs;
95   switch(event) {
96     case XACT_EVENT_COMMIT:
97       for(bs = HEAD_BS; bs; bs = bs->next) {
98         if(bs->inerror) local_amqp_disconnect_bs(bs);
99         bs->inerror = 0;
100         if(!bs->uncommitted) continue;
101         if(bs->conn) amqp_tx_commit(bs->conn, 2, AMQP_EMPTY_TABLE);
102         reply = amqp_get_rpc_reply();
103         if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
104           elog(WARNING, "amqp could not commit tx mode on broker %d", bs->broker_id);
105           local_amqp_disconnect_bs(bs);
106         }
107         bs->uncommitted = 0;
108       }
109       break;
110     case XACT_EVENT_ABORT:
111       for(bs = HEAD_BS; bs; bs = bs->next) {
112         if(bs->inerror) local_amqp_disconnect_bs(bs);
113         bs->inerror = 0;
114         if(!bs->uncommitted) continue;
115         if(bs->conn) amqp_tx_rollback(bs->conn, 2, AMQP_EMPTY_TABLE);
116         reply = amqp_get_rpc_reply();
117         if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
118           elog(WARNING, "amqp could not commit tx mode on broker %d", bs->broker_id);
119           local_amqp_disconnect_bs(bs);
120         }
121         bs->uncommitted = 0;
122       }
123       break;
124     case XACT_EVENT_PREPARE:
125       /* nothin' */
126       return;
127       break;
128   }
129 }
130
131 void _PG_init() {
132   RegisterXactCallback(amqp_local_phase2, NULL);
133 }
134
135 static struct brokerstate *
136 local_amqp_get_a_bs(broker_id) {
137   struct brokerstate *bs;
138   for(bs = HEAD_BS; bs; bs = bs->next) {
139     if(bs->broker_id == broker_id) return bs;
140   }
141   bs = MemoryContextAllocZero(TopMemoryContext, sizeof(*bs));
142   bs->broker_id = broker_id;
143   bs->next = HEAD_BS;
144   HEAD_BS = bs;
145   return bs;
146 }
147 static struct brokerstate *
148 local_amqp_get_bs(broker_id) {
149   char sql[1024];
150   struct brokerstate *bs = local_amqp_get_a_bs(broker_id);
151   if(bs->conn) return bs;
152   if(SPI_connect() == SPI_ERROR_CONNECT) return NULL;
153   snprintf(sql, sizeof(sql), "SELECT host, port, vhost, username, password "
154                              "  FROM amqp.broker "
155                              " WHERE broker_id = %d", broker_id);
156   if(SPI_OK_SELECT == SPI_execute(sql, true, 1)) {
157     if(1 == SPI_processed) {
158       amqp_rpc_reply_t *reply, s_reply;
159       char *host, *vhost, *user, *pass;
160       Datum port_datum;
161       bool is_null;
162       int port = 5672;
163       host = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
164       if(!host) host = "localhost";
165       port_datum = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2, &is_null);
166       if(!is_null) port = DatumGetInt32(port_datum);
167       vhost = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 3);
168       if(!vhost) vhost = "/";
169       user = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 4);
170       if(!user) user = "guest";
171       pass = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 5);
172       if(!pass) pass = "guest";
173       SPI_finish();
174
175       bs->conn = amqp_new_connection();
176       if(!bs->conn) return NULL;
177       bs->sockfd = amqp_open_socket(host, port);
178       if(bs->sockfd < 0) goto busted;
179       amqp_set_sockfd(bs->conn, bs->sockfd);
180       s_reply = amqp_login(bs->conn, vhost, 0, 131072,
181                            0, AMQP_SASL_METHOD_PLAIN,
182                            user, pass);
183       if(s_reply.reply_type != AMQP_RESPONSE_NORMAL) {
184         elog(WARNING, "amqp login failed on broker %d", broker_id);
185         goto busted;
186       }
187       amqp_channel_open(bs->conn, 1);
188       reply = amqp_get_rpc_reply();
189       if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
190         elog(WARNING, "amqp channel open failed on broker %d", broker_id);
191         goto busted;
192       }
193       amqp_channel_open(bs->conn, 2);
194       reply = amqp_get_rpc_reply();
195       if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
196         elog(WARNING, "amqp channel open failed on broker %d", broker_id);
197         goto busted;
198       }
199       amqp_tx_select(bs->conn, 2, AMQP_EMPTY_TABLE);
200       reply = amqp_get_rpc_reply();
201       if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
202         elog(WARNING, "amqp could not start tx mode on broker %d", broker_id);
203         goto busted;
204       }
205     } else {
206       elog(WARNING, "amqp can't find broker %d", broker_id);
207       SPI_finish();
208     }
209   } else {
210     elog(WARNING, "amqp broker lookup query failed");
211     SPI_finish();
212   }
213   return bs;
214  busted:
215   local_amqp_disconnect_bs(bs);
216   return bs;
217 }
218 static void
219 local_amqp_disconnect(broker_id) {
220   struct brokerstate *bs = local_amqp_get_a_bs(broker_id);
221   local_amqp_disconnect_bs(bs);
222 }
223
224 Datum
225 pg_amqp_exchange_declare(PG_FUNCTION_ARGS) {
226   struct brokerstate *bs;
227   if(!PG_ARGISNULL(0)) {
228     int broker_id;
229     broker_id = PG_GETARG_INT32(0);
230     bs = local_amqp_get_bs(broker_id);
231     if(bs && bs->conn) {
232       amqp_rpc_reply_t *reply;
233       amqp_bytes_t exchange_b;
234       amqp_bytes_t exchange_type_b;
235       amqp_boolean_t passive = 0;
236       amqp_boolean_t durable = 0;
237       amqp_boolean_t auto_delete = 0;
238
239       set_bytes_from_text(exchange_b,1);
240       set_bytes_from_text(exchange_type_b,2);
241       passive = PG_GETARG_BOOL(3);
242       durable = PG_GETARG_BOOL(4);
243       auto_delete = PG_GETARG_BOOL(5);
244       amqp_exchange_declare(bs->conn, 1,
245                             exchange_b, exchange_type_b,
246                             passive, durable, auto_delete, AMQP_EMPTY_TABLE);
247       reply = amqp_get_rpc_reply();
248       if(reply->reply_type == AMQP_RESPONSE_NORMAL)
249         PG_RETURN_BOOL(0 == 0);
250       bs->inerror = 1;
251     }
252   }
253   PG_RETURN_BOOL(0 != 0);
254 }
255 static Datum
256 pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) {
257   struct brokerstate *bs;
258   if(!PG_ARGISNULL(0)) {
259     int broker_id;
260     broker_id = PG_GETARG_INT32(0);
261     bs = local_amqp_get_bs(broker_id);
262     if(bs && bs->conn && (channel == 1 || !bs->inerror)) {
263       int rv;
264       amqp_rpc_reply_t *reply;
265       amqp_boolean_t mandatory = 0;
266       amqp_boolean_t immediate = 0;
267       amqp_bytes_t exchange_b = amqp_cstring_bytes("amq.direct");
268       amqp_bytes_t routing_key_b = amqp_cstring_bytes("");
269       amqp_bytes_t body_b = amqp_cstring_bytes("");
270
271       set_bytes_from_text(exchange_b,1);
272       set_bytes_from_text(routing_key_b,2);
273       set_bytes_from_text(body_b,3);
274       rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b,
275                               mandatory, immediate, NULL, body_b);
276       reply = amqp_get_rpc_reply();
277       if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) {
278         bs->inerror = 1;
279         PG_RETURN_BOOL(0 != 0);
280       }
281       /* channel two is transactional */
282       if(channel == 2) bs->uncommitted++;
283       PG_RETURN_BOOL(rv == 0);
284     }
285   }
286   PG_RETURN_BOOL(0 != 0);
287 }
288
289 PG_FUNCTION_INFO_V1(pg_amqp_publish);
290 Datum
291 pg_amqp_publish(PG_FUNCTION_ARGS) {
292   return pg_amqp_publish_opt(fcinfo, 2);
293 }
294
295 PG_FUNCTION_INFO_V1(pg_amqp_autonomous_publish);
296 Datum
297 pg_amqp_autonomous_publish(PG_FUNCTION_ARGS) {
298   return pg_amqp_publish_opt(fcinfo, 1);
299 }
300
301 PG_FUNCTION_INFO_V1(pg_amqp_disconnect);
302 Datum
303 pg_amqp_disconnect(PG_FUNCTION_ARGS) {
304   if(!PG_ARGISNULL(0)) {
305     int broker_id;
306     broker_id = PG_GETARG_INT32(0);
307     local_amqp_disconnect(broker_id);
308   }
309   PG_RETURN_VOID();
310 }
311
Note: See TracBrowser for help on using the browser.