root/src/modules/stomp_driver.c

Revision 7afb4e334fa8390d0543fc8d916d5c6b861511a9, 6.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

fixes #219

This is "significant" as it requires adding a module section to
stratcon.conf and not using the <stomp> stanza, but instead using
<mq type="stomp">. I've tested it with ActiveMQ and RabbitMQ and
both work fine.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "stratcon_iep.h"
38 #include "noit_conf.h"
39 #include "libstomp.h"
40
41 struct stomp_driver {
42   stomp_connection *connection;
43   apr_pool_t *pool;
44   char *exchange;
45   char *user;
46   char *pass;
47   char *destination;
48   char *hostname;
49   int port;
50 };
51
52 static iep_thread_driver_t *noit_stomp_allocate() {
53   struct stomp_driver *dr;
54   dr = calloc(1, sizeof(*dr));
55   if(apr_pool_create(&dr->pool, NULL) != APR_SUCCESS) {
56     free(dr);
57     return NULL;
58   }
59   noit_conf_get_string(NULL, "/stratcon/iep/mq/exchange", &dr->exchange);
60   if(!noit_conf_get_string(NULL, "/stratcon/iep/mq/destination", &dr->destination))
61   if(!dr->destination) dr->destination = strdup("/queue/noit.firehose");
62   noit_conf_get_string(NULL, "/stratcon/iep/mq/username", &dr->user);
63   noit_conf_get_string(NULL, "/stratcon/iep/mq/password", &dr->pass);
64   noit_conf_get_string(NULL, "/stratcon/iep/mq/hostname", &dr->hostname);
65   if(!dr->hostname) dr->hostname = strdup("127.0.0.1");
66   if(!noit_conf_get_int(NULL, "/stratcon/iep/mq/port", &dr->port))
67     dr->port = 61613;
68   return (iep_thread_driver_t *)dr;
69 }
70 static int noit_stomp_disconnect(iep_thread_driver_t *d) {
71   struct stomp_driver *dr = (struct stomp_driver *)d;
72   if(dr->connection) {
73     stomp_disconnect(&dr->connection);
74     return 0;
75   }
76   return -1;
77 }
78 static void noit_stomp_deallocate(iep_thread_driver_t *d) {
79   struct stomp_driver *dr = (struct stomp_driver *)d;
80   if(dr->connection) stomp_disconnect(&dr->connection);
81   if(dr->pool) apr_pool_destroy(dr->pool);
82   if(dr->exchange) free(dr->exchange);
83   if(dr->destination) free(dr->destination);
84   if(dr->user) free(dr->user);
85   if(dr->pass) free(dr->pass);
86   if(dr->hostname) free(dr->hostname);
87   free(dr);
88 }
89 static int noit_stomp_connect(iep_thread_driver_t *dr) {
90   struct stomp_driver *driver = (struct stomp_driver *)dr;
91   apr_status_t rc;
92   stomp_frame frame;
93
94   if(!driver->connection) {
95     if(stomp_connect(&driver->connection, driver->hostname, driver->port,
96                      driver->pool)!= APR_SUCCESS) {
97       noitL(noit_error, "MQ connection failed\n");
98       stomp_disconnect(&driver->connection);
99       return -1;
100     }
101
102     frame.command = "CONNECT";
103     frame.headers = apr_hash_make(driver->pool);
104
105     // This is for RabbitMQ Support
106     if(driver->user && driver->pass) {
107       apr_hash_set(frame.headers, "login",
108                    APR_HASH_KEY_STRING, driver->user);
109       apr_hash_set(frame.headers, "passcode",
110                    APR_HASH_KEY_STRING, driver->pass);
111     }
112     if(driver->exchange)
113       apr_hash_set(frame.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange);
114
115     frame.body = NULL;
116     frame.body_length = -1;
117     rc = stomp_write(driver->connection, &frame, driver->pool);
118     if(rc != APR_SUCCESS) {
119       noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc);
120       stomp_disconnect(&driver->connection);
121       return -1;
122     }
123     return 0;
124   }
125   /* 1 means already connected */
126   return 1;
127 }
128 static int noit_stomp_submit(iep_thread_driver_t *dr,
129                              const char *payload, size_t payloadlen) {
130   struct stomp_driver *driver = (struct stomp_driver *)dr;
131   apr_pool_t *dummy;
132   apr_status_t rc;
133   stomp_frame out;
134
135   if(apr_pool_create(&dummy, NULL) != APR_SUCCESS) return -1;
136
137   out.command = "SEND";
138   out.headers = apr_hash_make(dummy);
139   if (driver->exchange)
140     apr_hash_set(out.headers, "exchange",
141                  APR_HASH_KEY_STRING, driver->exchange);
142
143   apr_hash_set(out.headers, "destination",
144                APR_HASH_KEY_STRING, driver->destination);
145   apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto");
146  
147   out.body_length = -1;
148   out.body = (char *)payload;
149   rc = stomp_write(driver->connection, &out, dummy);
150   if(rc != APR_SUCCESS) {
151     noitL(noit_error, "STOMP send failed, disconnecting\n");
152     if(driver->connection) stomp_disconnect(&driver->connection);
153     driver->connection = NULL;
154   }
155   apr_pool_destroy(dummy);
156   return (rc == APR_SUCCESS) ? 0 : -1;
157 }
158
159 mq_driver_t mq_driver_stomp = {
160   noit_stomp_allocate,
161   noit_stomp_connect,
162   noit_stomp_submit,
163   noit_stomp_disconnect,
164   noit_stomp_deallocate
165 };
166
167 static int noit_stomp_driver_config(noit_module_generic_t *self, noit_hash_table *o) {
168   return 0;
169 }
170 static int noit_stomp_driver_onload(noit_image_t *self) {
171   return 0;
172 }
173
174 static int noit_stomp_driver_init(noit_module_generic_t *self) {
175   apr_initialize();
176   atexit(apr_terminate);
177   stratcon_iep_mq_driver_register("stomp", &mq_driver_stomp);
178   return 0;
179 }
180
181 noit_module_generic_t stomp_driver = {
182   {
183     NOIT_GENERIC_MAGIC,
184     NOIT_GENERIC_ABI_VERSION,
185     "stomp_driver",
186     "STOMP driver for IEP MQ submission",
187     "",
188     noit_stomp_driver_onload
189   },
190   noit_stomp_driver_config,
191   noit_stomp_driver_init
192 };
193
Note: See TracBrowser for help on using the browser.