root/src/modules/stomp_driver.c

Revision e2ae3eed2723823430eb1dcf4bfb3ef415c86289, 6.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

docs for stomp_driver and postgres_ingestor

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "stratcon_iep.h"
38 #include "noit_conf.h"
39 #include "libstomp.h"
40 #include "stomp_driver.xmlh"
41
42 struct stomp_driver {
43   stomp_connection *connection;
44   apr_pool_t *pool;
45   char *exchange;
46   char *user;
47   char *pass;
48   char *destination;
49   char *hostname;
50   int port;
51 };
52
53 static iep_thread_driver_t *noit_stomp_allocate() {
54   struct stomp_driver *dr;
55   dr = calloc(1, sizeof(*dr));
56   if(apr_pool_create(&dr->pool, NULL) != APR_SUCCESS) {
57     free(dr);
58     return NULL;
59   }
60   noit_conf_get_string(NULL, "/stratcon/iep/mq/exchange", &dr->exchange);
61   if(!noit_conf_get_string(NULL, "/stratcon/iep/mq/destination", &dr->destination))
62   if(!dr->destination) dr->destination = strdup("/queue/noit.firehose");
63   noit_conf_get_string(NULL, "/stratcon/iep/mq/username", &dr->user);
64   noit_conf_get_string(NULL, "/stratcon/iep/mq/password", &dr->pass);
65   noit_conf_get_string(NULL, "/stratcon/iep/mq/hostname", &dr->hostname);
66   if(!dr->hostname) dr->hostname = strdup("127.0.0.1");
67   if(!noit_conf_get_int(NULL, "/stratcon/iep/mq/port", &dr->port))
68     dr->port = 61613;
69   return (iep_thread_driver_t *)dr;
70 }
71 static int noit_stomp_disconnect(iep_thread_driver_t *d) {
72   struct stomp_driver *dr = (struct stomp_driver *)d;
73   if(dr->connection) {
74     stomp_disconnect(&dr->connection);
75     return 0;
76   }
77   return -1;
78 }
79 static void noit_stomp_deallocate(iep_thread_driver_t *d) {
80   struct stomp_driver *dr = (struct stomp_driver *)d;
81   if(dr->connection) stomp_disconnect(&dr->connection);
82   if(dr->pool) apr_pool_destroy(dr->pool);
83   if(dr->exchange) free(dr->exchange);
84   if(dr->destination) free(dr->destination);
85   if(dr->user) free(dr->user);
86   if(dr->pass) free(dr->pass);
87   if(dr->hostname) free(dr->hostname);
88   free(dr);
89 }
90 static int noit_stomp_connect(iep_thread_driver_t *dr) {
91   struct stomp_driver *driver = (struct stomp_driver *)dr;
92   apr_status_t rc;
93   stomp_frame frame;
94
95   if(!driver->connection) {
96     if(stomp_connect(&driver->connection, driver->hostname, driver->port,
97                      driver->pool)!= APR_SUCCESS) {
98       noitL(noit_error, "MQ connection failed\n");
99       stomp_disconnect(&driver->connection);
100       return -1;
101     }
102
103     frame.command = "CONNECT";
104     frame.headers = apr_hash_make(driver->pool);
105
106     // This is for RabbitMQ Support
107     if(driver->user && driver->pass) {
108       apr_hash_set(frame.headers, "login",
109                    APR_HASH_KEY_STRING, driver->user);
110       apr_hash_set(frame.headers, "passcode",
111                    APR_HASH_KEY_STRING, driver->pass);
112     }
113     if(driver->exchange)
114       apr_hash_set(frame.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange);
115
116     frame.body = NULL;
117     frame.body_length = -1;
118     rc = stomp_write(driver->connection, &frame, driver->pool);
119     if(rc != APR_SUCCESS) {
120       noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc);
121       stomp_disconnect(&driver->connection);
122       return -1;
123     }
124     return 0;
125   }
126   /* 1 means already connected */
127   return 1;
128 }
129 static int noit_stomp_submit(iep_thread_driver_t *dr,
130                              const char *payload, size_t payloadlen) {
131   struct stomp_driver *driver = (struct stomp_driver *)dr;
132   apr_pool_t *dummy;
133   apr_status_t rc;
134   stomp_frame out;
135
136   if(apr_pool_create(&dummy, NULL) != APR_SUCCESS) return -1;
137
138   out.command = "SEND";
139   out.headers = apr_hash_make(dummy);
140   if (driver->exchange)
141     apr_hash_set(out.headers, "exchange",
142                  APR_HASH_KEY_STRING, driver->exchange);
143
144   apr_hash_set(out.headers, "destination",
145                APR_HASH_KEY_STRING, driver->destination);
146   apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto");
147  
148   out.body_length = -1;
149   out.body = (char *)payload;
150   rc = stomp_write(driver->connection, &out, dummy);
151   if(rc != APR_SUCCESS) {
152     noitL(noit_error, "STOMP send failed, disconnecting\n");
153     if(driver->connection) stomp_disconnect(&driver->connection);
154     driver->connection = NULL;
155   }
156   apr_pool_destroy(dummy);
157   return (rc == APR_SUCCESS) ? 0 : -1;
158 }
159
160 mq_driver_t mq_driver_stomp = {
161   noit_stomp_allocate,
162   noit_stomp_connect,
163   noit_stomp_submit,
164   noit_stomp_disconnect,
165   noit_stomp_deallocate
166 };
167
168 static int noit_stomp_driver_config(noit_module_generic_t *self, noit_hash_table *o) {
169   return 0;
170 }
171 static int noit_stomp_driver_onload(noit_image_t *self) {
172   return 0;
173 }
174
175 static int noit_stomp_driver_init(noit_module_generic_t *self) {
176   apr_initialize();
177   atexit(apr_terminate);
178   stratcon_iep_mq_driver_register("stomp", &mq_driver_stomp);
179   return 0;
180 }
181
182 noit_module_generic_t stomp_driver = {
183   {
184     NOIT_GENERIC_MAGIC,
185     NOIT_GENERIC_ABI_VERSION,
186     "stomp_driver",
187     "STOMP driver for IEP MQ submission",
188     stomp_driver_xml_description,
189     noit_stomp_driver_onload
190   },
191   noit_stomp_driver_config,
192   noit_stomp_driver_init
193 };
194
Note: See TracBrowser for help on using the browser.