root/src/modules/libstomp.c

Revision 7afb4e334fa8390d0543fc8d916d5c6b861511a9, 12.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

fixes #219

This is "significant" as it requires adding a module section to
stratcon.conf and not using the <stomp> stanza, but instead using
<mq type="stomp">. I've tested it with ActiveMQ and RabbitMQ and
both work fine.

  • Property mode set to 100644
Line 
1 /**
2  *
3  * Copyright 2005 LogicBlaze Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 #include <stdlib.h>
19 #include <string.h>
20 #include "apr.h"
21 #include "apr_strings.h"
22 #include "libstomp.h"
23
24 /********************************************************************************
25  *
26  * Used to establish a connection
27  *
28  ********************************************************************************/
29 APR_DECLARE(apr_status_t) stomp_connect(stomp_connection **connection_ref, const char *hostname, int port, apr_pool_t *pool)
30 {
31         apr_status_t rc;
32         int socket_family;
33         stomp_connection *connection=NULL;
34    
35         //
36         // Allocate the connection and a memory pool for the connection.
37         //
38         connection = apr_pcalloc(pool, sizeof(*connection));
39         if( connection == NULL )
40                 return APR_ENOMEM;
41    
42 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; }
43    
44         // Look up the remote address
45         rc = apr_sockaddr_info_get(&connection->remote_sa, hostname, APR_UNSPEC, port, 0, pool);
46         CHECK_SUCCESS;
47        
48         // Create and Connect the socket.
49         socket_family = connection->remote_sa->sa.sin.sin_family;
50         rc = apr_socket_create(&connection->socket, socket_family, SOCK_STREAM, APR_PROTO_TCP, pool);
51         CHECK_SUCCESS; 
52 #undef CHECK_SUCCESS
53
54 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_socket_close(connection->socket); return rc; }
55    rc = apr_socket_connect(connection->socket, connection->remote_sa);
56         CHECK_SUCCESS;
57    
58    // Get the Socket Info
59    rc = apr_socket_addr_get(&connection->remote_sa, APR_REMOTE, connection->socket);
60         CHECK_SUCCESS;
61    rc = apr_sockaddr_ip_get(&connection->remote_ip, connection->remote_sa);
62         CHECK_SUCCESS;
63    rc = apr_socket_addr_get(&connection->local_sa, APR_LOCAL, connection->socket);
64         CHECK_SUCCESS;
65    rc = apr_sockaddr_ip_get(&connection->local_ip, connection->local_sa);
66         CHECK_SUCCESS; 
67    
68    // Set socket options.
69    //   rc = apr_socket_timeout_set( connection->socket, 2*APR_USEC_PER_SEC);
70    //   CHECK_SUCCESS;
71    
72 #undef CHECK_SUCCESS
73    
74         *connection_ref = connection;
75         return rc;     
76 }
77
78 APR_DECLARE(apr_status_t) stomp_disconnect(stomp_connection **connection_ref)
79 {
80    apr_status_t result, rc;
81         stomp_connection *connection = *connection_ref;
82    
83    if( connection_ref == NULL || *connection_ref==NULL )
84       return APR_EGENERAL;
85    
86         result = APR_SUCCESS;   
87    rc = apr_socket_shutdown(connection->socket, APR_SHUTDOWN_WRITE);   
88         if( result!=APR_SUCCESS )
89                 result = rc;
90    
91    if( connection->socket != NULL ) {
92       rc = apr_socket_close(connection->socket);
93       if( result!=APR_SUCCESS )
94          result = rc;
95       connection->socket=NULL;
96    }   
97         *connection_ref=NULL;
98         return rc;     
99 }
100
101 /********************************************************************************
102  *
103  * Wrappers around the apr_socket_send and apr_socket_recv calls so that they
104  * read/write their buffers fully.
105  *
106  ********************************************************************************/
107 APR_DECLARE(apr_status_t) stomp_write_buffer(stomp_connection *connection, const char *data, apr_size_t size)
108 {
109    apr_size_t remaining = size;
110    size=0;
111         while( remaining>0 ) {
112                 apr_size_t length = remaining;
113                 apr_status_t rc = apr_socket_send(connection->socket, data, &length);
114       data+=length;
115       remaining -= length;
116       //      size += length;
117       if( rc != APR_SUCCESS ) {
118          return rc;
119       }
120         }
121         return APR_SUCCESS;
122 }
123
124 typedef struct data_block_list {
125    char data[1024];
126    struct data_block_list *next;
127 } data_block_list;
128
129 APR_DECLARE(apr_status_t) stomp_read_line(stomp_connection *connection, char **data, int* length, apr_pool_t *pool)
130 {
131    apr_pool_t *tpool;
132    apr_status_t rc;
133    data_block_list *head, *tail;
134    apr_size_t i=0;
135    apr_size_t bytesRead=0;
136    char *p;
137    
138    rc = apr_pool_create(&tpool, pool);
139    if( rc != APR_SUCCESS ) {
140       return rc;
141    }
142      
143    head = tail = apr_pcalloc(tpool, sizeof(data_block_list));
144    if( head == NULL )
145       return APR_ENOMEM;
146
147 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_pool_destroy(tpool);  return rc; }
148        
149    while( 1 ) {
150      
151           apr_size_t length = 1;
152       apr_status_t rc = apr_socket_recv(connection->socket, tail->data+i, &length);
153       CHECK_SUCCESS;
154      
155       if( length==1 ) {
156          i++;
157          bytesRead++;
158          
159          // Keep reading bytes till end of line
160          if( tail->data[i-1]=='\n') {
161             // Null terminate the string instead of having the newline
162                     tail->data[i-1] = 0;
163                         break;
164          } else if( tail->data[i-1]==0 ) {
165                         // Encountered 0 before end of line
166                         apr_pool_destroy(tpool);
167                         return APR_EGENERAL;
168                  }
169          
170          // Do we need to allocate a new block?
171          if( i >= sizeof( tail->data) ) {           
172             tail->next = apr_pcalloc(tpool, sizeof(data_block_list));
173             if( tail->next == NULL ) {
174                apr_pool_destroy(tpool);
175                return APR_ENOMEM;
176             }
177             tail=tail->next;
178             i=0;
179          }
180       }     
181         }
182
183 #undef CHECK_SUCCESS
184    // Now we have the whole frame and know how big it is.  Allocate it's buffer
185    *data = apr_pcalloc(pool, bytesRead);
186    p = *data;
187    if( p==NULL ) {
188       apr_pool_destroy(tpool);
189       return APR_ENOMEM;
190    }
191
192    // Copy the frame over to the new buffer.
193    *length = bytesRead - 1;
194    for( ;head != NULL; head = head->next ) {
195       int len = bytesRead > sizeof(head->data) ? sizeof(head->data) : bytesRead;
196       memcpy(p,head->data,len);
197       p+=len;
198       bytesRead-=len;
199    }
200    
201    apr_pool_destroy(tpool);
202    return APR_SUCCESS;
203 }
204
205 APR_DECLARE(apr_status_t) stomp_read_buffer(stomp_connection *connection, char **data, apr_pool_t *pool)
206 {
207    apr_pool_t *tpool;
208    apr_status_t rc;
209    data_block_list *head, *tail;
210    apr_size_t i=0;
211    apr_size_t bytesRead=0;
212    char *p;
213    
214    rc = apr_pool_create(&tpool, pool);
215    if( rc != APR_SUCCESS ) {
216       return rc;
217    }
218      
219    head = tail = apr_pcalloc(tpool, sizeof(data_block_list));
220    if( head == NULL )
221       return APR_ENOMEM;
222    
223 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_pool_destroy(tpool);  return rc; }
224    
225    // Keep reading bytes till end of frame is encountered.
226         while( 1 ) {
227      
228                 apr_size_t length = 1;
229       apr_status_t rc = apr_socket_recv(connection->socket, tail->data+i, &length);
230       CHECK_SUCCESS;
231      
232       if( length==1 ) {
233          i++;
234          bytesRead++;
235          
236          // Keep reading bytes till end of frame
237          if( tail->data[i-1]==0 ) {
238             char endline[1];
239             // We expect a newline after the null.
240             apr_socket_recv(connection->socket, endline, &length);
241             CHECK_SUCCESS;
242             if( endline[0] != '\n' ) {
243                return APR_EGENERAL;
244             }
245             break;
246          }
247          
248          // Do we need to allocate a new block?
249          if( i >= sizeof( tail->data) ) {           
250             tail->next = apr_pcalloc(tpool, sizeof(data_block_list));
251             if( tail->next == NULL ) {
252                apr_pool_destroy(tpool);
253                return APR_ENOMEM;
254             }
255             tail=tail->next;
256             i=0;
257          }
258       }     
259         }
260 #undef CHECK_SUCCESS
261    
262    // Now we have the whole frame and know how big it is.  Allocate it's buffer
263    *data = apr_pcalloc(pool, bytesRead);
264    p = *data;
265    if( p==NULL ) {
266       apr_pool_destroy(tpool);
267       return APR_ENOMEM;
268    }
269    
270    // Copy the frame over to the new buffer.
271    for( ;head != NULL; head = head->next ) {
272       int len = bytesRead > sizeof(head->data) ? sizeof(head->data) : bytesRead;
273       memcpy(p,head->data,len);
274       p+=len;
275       bytesRead-=len;
276    }
277    
278    apr_pool_destroy(tpool);
279         return APR_SUCCESS;
280 }
281
282 /********************************************************************************
283  *
284  * Handles reading and writing stomp_frames to and from the connection
285  *
286  ********************************************************************************/
287
288 APR_DECLARE(apr_status_t) stomp_write(stomp_connection *connection, stomp_frame *frame, apr_pool_t* pool) {
289    apr_status_t rc;
290    
291 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; }
292    // Write the command.
293    rc = stomp_write_buffer(connection, frame->command, strlen(frame->command));
294    CHECK_SUCCESS;               
295    rc = stomp_write_buffer(connection, "\n", 1);
296    CHECK_SUCCESS;
297    
298    // Write the headers
299    if( frame->headers != NULL ) {
300      
301       apr_hash_index_t *i;
302       const void *key;
303       void *value;
304       for (i = apr_hash_first(NULL, frame->headers); i; i = apr_hash_next(i)) {
305          apr_hash_this(i, &key, NULL, &value);
306          
307          rc = stomp_write_buffer(connection, key, strlen(key));
308          CHECK_SUCCESS;
309          rc = stomp_write_buffer(connection, ":", 1);
310          CHECK_SUCCESS;
311          rc = stomp_write_buffer(connection, value, strlen(value));
312          CHECK_SUCCESS;
313          rc = stomp_write_buffer(connection, "\n", 1);
314          CHECK_SUCCESS; 
315       }
316
317           if(frame->body_length >= 0) {
318                   apr_pool_t *length_pool;
319                   char *length_string;
320
321                   apr_pool_create(&length_pool, pool);
322                   rc = stomp_write_buffer(connection, "content-length:", 15);
323                   CHECK_SUCCESS;
324                  
325                   length_string = apr_itoa(length_pool, frame->body_length);
326                   rc = stomp_write_buffer(connection, length_string, strlen(length_string));
327                   CHECK_SUCCESS;
328                   rc = stomp_write_buffer(connection, "\n", 1);
329                   CHECK_SUCCESS;
330
331                   apr_pool_destroy(length_pool);
332           }
333    }
334    rc = stomp_write_buffer(connection, "\n", 1);
335    CHECK_SUCCESS;
336    
337    // Write the body.
338    if( frame->body != NULL ) {
339       int body_length = frame->body_length;
340           if(body_length < 0)
341                   body_length = strlen(frame->body);
342       rc = stomp_write_buffer(connection, frame->body, body_length);
343       CHECK_SUCCESS;
344    }
345    rc = stomp_write_buffer(connection, "\0\n", 2);
346    CHECK_SUCCESS;
347      
348 #undef CHECK_SUCCESS
349                    
350    return APR_SUCCESS;
351 }
352
353 APR_DECLARE(apr_status_t) stomp_read(stomp_connection *connection, stomp_frame **frame, apr_pool_t *pool) {
354    
355    apr_status_t rc;
356    stomp_frame *f;
357      
358    f = apr_pcalloc(pool, sizeof(stomp_frame));
359    if( f == NULL )
360       return APR_ENOMEM;
361    
362    f->headers = apr_hash_make(pool);
363    if( f->headers == NULL )
364       return APR_ENOMEM;
365          
366 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; }
367    
368    // Parse the frame out.
369    {
370       char *p;
371           int length;
372      
373       // Parse the command.
374           rc = stomp_read_line(connection, &p, &length, pool);
375           CHECK_SUCCESS;
376
377       f->command = p;
378      
379       // Start parsing the headers.
380       while( 1 ) {
381          rc = stomp_read_line(connection, &p, &length, pool);
382                  CHECK_SUCCESS;
383                  
384                  // Done with headers
385                  if(length == 0)
386                         break;
387
388          {
389             // Parse the header line.
390             char *p2;
391             void *key;
392             void *value;
393            
394             p2 = strstr(p,":");
395             if( p2 == NULL ) {
396                // Expected at 1 : to delimit the key from the value.
397                return APR_EGENERAL;
398             }
399            
400             // Null terminate the key
401             *p2=0;           
402             key = p;
403            
404             // The rest if the value.
405             value = p2+1;
406            
407             // Insert key/value into hash table.
408             apr_hash_set(f->headers, key, APR_HASH_KEY_STRING, value);           
409          }
410       }
411      
412       // Check for content length
413           {
414                   char* content_length = apr_hash_get(f->headers, "content-length", APR_HASH_KEY_STRING);
415                   if(content_length) {
416                           char endbuffer[2];
417                           apr_size_t length = 2;
418
419                           f->body_length = atoi(content_length);
420                           f->body = apr_pcalloc(pool, f->body_length);
421                           rc = apr_socket_recv(connection->socket, f->body, &f->body_length);
422                           CHECK_SUCCESS;
423
424                           // Expect a \n after the end
425                           rc = apr_socket_recv(connection->socket, endbuffer, &length);
426                           CHECK_SUCCESS;
427                           if(length != 2 || endbuffer[0] != '\0' || endbuffer[1] != '\n')
428                                   return APR_EGENERAL;
429                   }
430                   else
431                   {
432                           // The remainder of the buffer (including the \n at the end) is the body)
433                           rc = stomp_read_buffer(connection, &f->body, pool);
434                           CHECK_SUCCESS;
435                   }
436           }
437    }
438    
439 #undef CHECK_SUCCESS
440    *frame = f;
441         return APR_SUCCESS;
442 }
Note: See TracBrowser for help on using the browser.