root/src/stomp/stomp.c

Revision a9077178423e39a94a9b624e44cd4b37899d6fd3, 12.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

refs #122, this stuff works well enough to pull dev back into trunk.
Closing the #122 branch.

flag-day(noitd,stratcond)

  • Property mode set to 100644
Line 
1 /**
2  *
3  * Copyright 2005 LogicBlaze Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 #include <stdlib.h>
19 #include <string.h>
20 #include "apr.h"
21 #include "apr_strings.h"
22 #include "stomp.h"
23
24 /********************************************************************************
25  *
26  * Used to establish a connection
27  *
28  ********************************************************************************/
29 APR_DECLARE(apr_status_t) stomp_connect(stomp_connection **connection_ref, const char *hostname, int port, apr_pool_t *pool)
30 {
31         apr_status_t rc;
32         int socket_family;
33         stomp_connection *connection=NULL;
34    
35         //
36         // Allocate the connection and a memory pool for the connection.
37         //
38         connection = apr_pcalloc(pool, sizeof(connection));
39         if( connection == NULL )
40                 return APR_ENOMEM;
41    
42 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; }
43    
44         // Look up the remote address
45         rc = apr_sockaddr_info_get(&connection->remote_sa, hostname, APR_UNSPEC, port, 0, pool);
46         CHECK_SUCCESS;
47        
48         // Create and Connect the socket.
49         socket_family = connection->remote_sa->sa.sin.sin_family;
50         rc = apr_socket_create(&connection->socket, socket_family, SOCK_STREAM, APR_PROTO_TCP, pool);
51         CHECK_SUCCESS; 
52    rc = apr_socket_connect(connection->socket, connection->remote_sa);
53         CHECK_SUCCESS;
54    
55    // Get the Socket Info
56    rc = apr_socket_addr_get(&connection->remote_sa, APR_REMOTE, connection->socket);
57         CHECK_SUCCESS;
58    rc = apr_sockaddr_ip_get(&connection->remote_ip, connection->remote_sa);
59         CHECK_SUCCESS;
60    rc = apr_socket_addr_get(&connection->local_sa, APR_LOCAL, connection->socket);
61         CHECK_SUCCESS;
62    rc = apr_sockaddr_ip_get(&connection->local_ip, connection->local_sa);
63         CHECK_SUCCESS; 
64    
65    // Set socket options.
66    //   rc = apr_socket_timeout_set( connection->socket, 2*APR_USEC_PER_SEC);
67    //   CHECK_SUCCESS;
68    
69 #undef CHECK_SUCCESS
70    
71         *connection_ref = connection;
72         return rc;     
73 }
74
75 APR_DECLARE(apr_status_t) stomp_disconnect(stomp_connection **connection_ref)
76 {
77    apr_status_t result, rc;
78         stomp_connection *connection = *connection_ref;
79    
80    if( connection_ref == NULL || *connection_ref==NULL )
81       return APR_EGENERAL;
82    
83         result = APR_SUCCESS;   
84    rc = apr_socket_shutdown(connection->socket, APR_SHUTDOWN_WRITE);   
85         if( result!=APR_SUCCESS )
86                 result = rc;
87    
88    if( connection->socket != NULL ) {
89       rc = apr_socket_close(connection->socket);
90       if( result!=APR_SUCCESS )
91          result = rc;
92       connection->socket=NULL;
93    }   
94         *connection_ref=NULL;
95         return rc;     
96 }
97
98 /********************************************************************************
99  *
100  * Wrappers around the apr_socket_send and apr_socket_recv calls so that they
101  * read/write their buffers fully.
102  *
103  ********************************************************************************/
104 APR_DECLARE(apr_status_t) stomp_write_buffer(stomp_connection *connection, const char *data, apr_size_t size)
105 {
106    apr_size_t remaining = size;
107    size=0;
108         while( remaining>0 ) {
109                 apr_size_t length = remaining;
110                 apr_status_t rc = apr_socket_send(connection->socket, data, &length);
111       data+=length;
112       remaining -= length;
113       //      size += length;
114       if( rc != APR_SUCCESS ) {
115          return rc;
116       }
117         }
118         return APR_SUCCESS;
119 }
120
121 typedef struct data_block_list {
122    char data[1024];
123    struct data_block_list *next;
124 } data_block_list;
125
126 APR_DECLARE(apr_status_t) stomp_read_line(stomp_connection *connection, char **data, int* length, apr_pool_t *pool)
127 {
128    apr_pool_t *tpool;
129    apr_status_t rc;
130    data_block_list *head, *tail;
131    apr_size_t i=0;
132    apr_size_t bytesRead=0;
133    char *p;
134    
135    rc = apr_pool_create(&tpool, pool);
136    if( rc != APR_SUCCESS ) {
137       return rc;
138    }
139      
140    head = tail = apr_pcalloc(tpool, sizeof(data_block_list));
141    if( head == NULL )
142       return APR_ENOMEM;
143
144 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_pool_destroy(tpool);  return rc; }
145        
146    while( 1 ) {
147      
148           apr_size_t length = 1;
149       apr_status_t rc = apr_socket_recv(connection->socket, tail->data+i, &length);
150       CHECK_SUCCESS;
151      
152       if( length==1 ) {
153          i++;
154          bytesRead++;
155          
156          // Keep reading bytes till end of line
157          if( tail->data[i-1]=='\n') {
158             // Null terminate the string instead of having the newline
159                     tail->data[i-1] = 0;
160                         break;
161          } else if( tail->data[i-1]==0 ) {
162                         // Encountered 0 before end of line
163                         apr_pool_destroy(tpool);
164                         return APR_EGENERAL;
165                  }
166          
167          // Do we need to allocate a new block?
168          if( i >= sizeof( tail->data) ) {           
169             tail->next = apr_pcalloc(tpool, sizeof(data_block_list));
170             if( tail->next == NULL ) {
171                apr_pool_destroy(tpool);
172                return APR_ENOMEM;
173             }
174             tail=tail->next;
175             i=0;
176          }
177       }     
178         }
179
180 #undef CHECK_SUCCESS
181    // Now we have the whole frame and know how big it is.  Allocate it's buffer
182    *data = apr_pcalloc(pool, bytesRead);
183    p = *data;
184    if( p==NULL ) {
185       apr_pool_destroy(tpool);
186       return APR_ENOMEM;
187    }
188
189    // Copy the frame over to the new buffer.
190    *length = bytesRead - 1;
191    for( ;head != NULL; head = head->next ) {
192       int len = bytesRead > sizeof(head->data) ? sizeof(head->data) : bytesRead;
193       memcpy(p,head->data,len);
194       p+=len;
195       bytesRead-=len;
196    }
197    
198    apr_pool_destroy(tpool);
199    return APR_SUCCESS;
200 }
201
202 APR_DECLARE(apr_status_t) stomp_read_buffer(stomp_connection *connection, char **data, apr_pool_t *pool)
203 {
204    apr_pool_t *tpool;
205    apr_status_t rc;
206    data_block_list *head, *tail;
207    apr_size_t i=0;
208    apr_size_t bytesRead=0;
209    char *p;
210    
211    rc = apr_pool_create(&tpool, pool);
212    if( rc != APR_SUCCESS ) {
213       return rc;
214    }
215      
216    head = tail = apr_pcalloc(tpool, sizeof(data_block_list));
217    if( head == NULL )
218       return APR_ENOMEM;
219    
220 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_pool_destroy(tpool);  return rc; }
221    
222    // Keep reading bytes till end of frame is encountered.
223         while( 1 ) {
224      
225                 apr_size_t length = 1;
226       apr_status_t rc = apr_socket_recv(connection->socket, tail->data+i, &length);
227       CHECK_SUCCESS;
228      
229       if( length==1 ) {
230          i++;
231          bytesRead++;
232          
233          // Keep reading bytes till end of frame
234          if( tail->data[i-1]==0 ) {
235             char endline[1];
236             // We expect a newline after the null.
237             apr_socket_recv(connection->socket, endline, &length);
238             CHECK_SUCCESS;
239             if( endline[0] != '\n' ) {
240                return APR_EGENERAL;
241             }
242             break;
243          }
244          
245          // Do we need to allocate a new block?
246          if( i >= sizeof( tail->data) ) {           
247             tail->next = apr_pcalloc(tpool, sizeof(data_block_list));
248             if( tail->next == NULL ) {
249                apr_pool_destroy(tpool);
250                return APR_ENOMEM;
251             }
252             tail=tail->next;
253             i=0;
254          }
255       }     
256         }
257 #undef CHECK_SUCCESS
258    
259    // Now we have the whole frame and know how big it is.  Allocate it's buffer
260    *data = apr_pcalloc(pool, bytesRead);
261    p = *data;
262    if( p==NULL ) {
263       apr_pool_destroy(tpool);
264       return APR_ENOMEM;
265    }
266    
267    // Copy the frame over to the new buffer.
268    for( ;head != NULL; head = head->next ) {
269       int len = bytesRead > sizeof(head->data) ? sizeof(head->data) : bytesRead;
270       memcpy(p,head->data,len);
271       p+=len;
272       bytesRead-=len;
273    }
274    
275    apr_pool_destroy(tpool);
276         return APR_SUCCESS;
277 }
278
279 /********************************************************************************
280  *
281  * Handles reading and writing stomp_frames to and from the connection
282  *
283  ********************************************************************************/
284
285 APR_DECLARE(apr_status_t) stomp_write(stomp_connection *connection, stomp_frame *frame, apr_pool_t* pool) {
286    apr_status_t rc;
287    
288 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; }
289    // Write the command.
290    rc = stomp_write_buffer(connection, frame->command, strlen(frame->command));
291    CHECK_SUCCESS;               
292    rc = stomp_write_buffer(connection, "\n", 1);
293    CHECK_SUCCESS;
294    
295    // Write the headers
296    if( frame->headers != NULL ) {
297      
298       apr_hash_index_t *i;
299       const void *key;
300       void *value;
301       for (i = apr_hash_first(NULL, frame->headers); i; i = apr_hash_next(i)) {
302          apr_hash_this(i, &key, NULL, &value);
303          
304          rc = stomp_write_buffer(connection, key, strlen(key));
305          CHECK_SUCCESS;
306          rc = stomp_write_buffer(connection, ":", 1);
307          CHECK_SUCCESS;
308          rc = stomp_write_buffer(connection, value, strlen(value));
309          CHECK_SUCCESS;
310          rc = stomp_write_buffer(connection, "\n", 1);
311          CHECK_SUCCESS; 
312       }
313
314           if(frame->body_length >= 0) {
315                   apr_pool_t *length_pool;
316                   char *length_string;
317
318                   apr_pool_create(&length_pool, pool);
319                   rc = stomp_write_buffer(connection, "content-length:", 15);
320                   CHECK_SUCCESS;
321                  
322                   length_string = apr_itoa(length_pool, frame->body_length);
323                   rc = stomp_write_buffer(connection, length_string, strlen(length_string));
324                   CHECK_SUCCESS;
325                   rc = stomp_write_buffer(connection, "\n", 1);
326                   CHECK_SUCCESS;
327
328                   apr_pool_destroy(length_pool);
329           }
330    }
331    rc = stomp_write_buffer(connection, "\n", 1);
332    CHECK_SUCCESS;
333    
334    // Write the body.
335    if( frame->body != NULL ) {
336       int body_length = frame->body_length;
337           if(body_length < 0)
338                   body_length = strlen(frame->body);
339       rc = stomp_write_buffer(connection, frame->body, body_length);
340       CHECK_SUCCESS;
341    }
342    rc = stomp_write_buffer(connection, "\0\n", 2);
343    CHECK_SUCCESS;
344      
345 #undef CHECK_SUCCESS
346                    
347    return APR_SUCCESS;
348 }
349
350 APR_DECLARE(apr_status_t) stomp_read(stomp_connection *connection, stomp_frame **frame, apr_pool_t *pool) {
351    
352    apr_status_t rc;
353    stomp_frame *f;
354      
355    f = apr_pcalloc(pool, sizeof(stomp_frame));
356    if( f == NULL )
357       return APR_ENOMEM;
358    
359    f->headers = apr_hash_make(pool);
360    if( f->headers == NULL )
361       return APR_ENOMEM;
362          
363 #define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; }
364    
365    // Parse the frame out.
366    {
367       char *p;
368           int length;
369      
370       // Parse the command.
371           rc = stomp_read_line(connection, &p, &length, pool);
372           CHECK_SUCCESS;
373
374       f->command = p;
375      
376       // Start parsing the headers.
377       while( 1 ) {
378          rc = stomp_read_line(connection, &p, &length, pool);
379                  CHECK_SUCCESS;
380                  
381                  // Done with headers
382                  if(length == 0)
383                         break;
384
385          {
386             // Parse the header line.
387             char *p2;
388             void *key;
389             void *value;
390            
391             p2 = strstr(p,":");
392             if( p2 == NULL ) {
393                // Expected at 1 : to delimit the key from the value.
394                return APR_EGENERAL;
395             }
396            
397             // Null terminate the key
398             *p2=0;           
399             key = p;
400            
401             // The rest if the value.
402             value = p2+1;
403            
404             // Insert key/value into hash table.
405             apr_hash_set(f->headers, key, APR_HASH_KEY_STRING, value);           
406          }
407       }
408      
409       // Check for content length
410           {
411                   char* content_length = apr_hash_get(f->headers, "content-length", APR_HASH_KEY_STRING);
412                   if(content_length) {
413                           char endbuffer[2];
414                           apr_size_t length = 2;
415
416                           f->body_length = atoi(content_length);
417                           f->body = apr_pcalloc(pool, f->body_length);
418                           rc = apr_socket_recv(connection->socket, f->body, &f->body_length);
419                           CHECK_SUCCESS;
420
421                           // Expect a \n after the end
422                           rc = apr_socket_recv(connection->socket, endbuffer, &length);
423                           CHECK_SUCCESS;
424                           if(length != 2 || endbuffer[0] != '\0' || endbuffer[1] != '\n')
425                                   return APR_EGENERAL;
426                   }
427                   else
428                   {
429                           // The remainder of the buffer (including the \n at the end) is the body)
430                           rc = stomp_read_buffer(connection, &f->body, pool);
431                           CHECK_SUCCESS;
432                   }
433           }
434    }
435    
436 #undef CHECK_SUCCESS
437    *frame = f;
438         return APR_SUCCESS;
439 }
Note: See TracBrowser for help on using the browser.