root/src/modules/librabbitmq/amqp_table.c

Revision 1e8ae2b61ace7f9ef47a4f5c190bd2df06b04588, 4.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

initial import of a failover-aware AMQP client courtesy of Circonus

  • Property mode set to 100644
Line 
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <string.h>
4 #include <stdint.h>
5 #include <errno.h>
6
7 #include "amqp.h"
8 #include "amqp_private.h"
9
10 #include <assert.h>
11
12 #define INITIAL_TABLE_SIZE 16
13
14 int amqp_decode_table(amqp_bytes_t encoded,
15                       amqp_pool_t *pool,
16                       amqp_table_t *output,
17                       int *offsetptr)
18 {
19   int offset = *offsetptr;
20   uint32_t tablesize = D_32(encoded, offset);
21   int num_entries = 0;
22   amqp_table_entry_t *entries = malloc(INITIAL_TABLE_SIZE * sizeof(amqp_table_entry_t));
23   int allocated_entries = INITIAL_TABLE_SIZE;
24   int limit;
25
26   if (entries == NULL) {
27     return -ENOMEM;
28   }
29
30   offset += 4;
31   limit = offset + tablesize;
32
33   while (offset < limit) {
34     size_t keylen;
35     amqp_table_entry_t *entry;
36
37     keylen = D_8(encoded, offset);
38     offset++;
39
40     if (num_entries >= allocated_entries) {
41       void *newentries;
42       allocated_entries = allocated_entries * 2;
43       newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t));
44       if (newentries == NULL) {
45         free(entries);
46         return -ENOMEM;
47       }
48       entries = newentries;
49     }
50     entry = &entries[num_entries];
51
52     entry->key.len = keylen;
53     entry->key.bytes = D_BYTES(encoded, offset, keylen);
54     offset += keylen;
55
56     entry->kind = D_8(encoded, offset);
57     offset++;
58
59     switch (entry->kind) {
60       case 'S':
61         entry->value.bytes.len = D_32(encoded, offset);
62         offset += 4;
63         entry->value.bytes.bytes = D_BYTES(encoded, offset, entry->value.bytes.len);
64         offset += entry->value.bytes.len;
65         break;
66       case 'I':
67         entry->value.i32 = (int32_t) D_32(encoded, offset);
68         offset += 4;
69         break;
70       case 'D':
71         entry->value.decimal.decimals = D_8(encoded, offset);
72         offset++;
73         entry->value.decimal.value = D_32(encoded, offset);
74         offset += 4;
75         break;
76       case 'T':
77         entry->value.u64 = D_64(encoded, offset);
78         offset += 8;
79         break;
80       case 'F':
81         AMQP_CHECK_RESULT(amqp_decode_table(encoded, pool, &(entry->value.table), &offset));
82         break;
83       default:
84         free(entries);
85         return -EINVAL;
86     }
87
88     num_entries++;
89   }
90
91   output->num_entries = num_entries;
92   output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t));
93   memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t));
94   free(entries);
95
96   *offsetptr = offset;
97   return 0;
98 }
99
100 int amqp_encode_table(amqp_bytes_t encoded,
101                       amqp_table_t *input,
102                       int *offsetptr)
103 {
104   int offset = *offsetptr;
105   int tablesize_offset = offset;
106   int i;
107
108   offset += 4; /* skip space for the size of the table to be filled in later */
109
110   for (i = 0; i < input->num_entries; i++) {
111     amqp_table_entry_t *entry = &(input->entries[i]);
112
113     E_8(encoded, offset, entry->key.len);
114     offset++;
115
116     E_BYTES(encoded, offset, entry->key.len, entry->key.bytes);
117     offset += entry->key.len;
118
119     E_8(encoded, offset, entry->kind);
120     offset++;
121
122     switch (entry->kind) {
123       case 'S':
124         E_32(encoded, offset, entry->value.bytes.len);
125         offset += 4;
126         E_BYTES(encoded, offset, entry->value.bytes.len, entry->value.bytes.bytes);
127         offset += entry->value.bytes.len;
128         break;
129       case 'I':
130         E_32(encoded, offset, (uint32_t) entry->value.i32);
131         offset += 4;
132         break;
133       case 'D':
134         E_8(encoded, offset, entry->value.decimal.decimals);
135         offset++;
136         E_32(encoded, offset, entry->value.decimal.value);
137         offset += 4;
138         break;
139       case 'T':
140         E_64(encoded, offset, entry->value.u64);
141         offset += 8;
142         break;
143       case 'F':
144         AMQP_CHECK_RESULT(amqp_encode_table(encoded, &(entry->value.table), &offset));
145         break;
146       default:
147         return -EINVAL;
148     }
149   }
150
151   E_32(encoded, tablesize_offset, (offset - *offsetptr - 4));
152   *offsetptr = offset;
153   return 0;
154 }
155
156 int amqp_table_entry_cmp(void const *entry1, void const *entry2) {
157   amqp_table_entry_t const *p1 = (amqp_table_entry_t const *) entry1;
158   amqp_table_entry_t const *p2 = (amqp_table_entry_t const *) entry2;
159
160   int d;
161   int minlen;
162
163   minlen = p1->key.len;
164   if (p2->key.len < minlen) minlen = p2->key.len;
165
166   d = memcmp(p1->key.bytes, p2->key.bytes, minlen);
167   if (d != 0) {
168     return d;
169   }
170
171   return p1->key.len - p2->key.len;
172 }
Note: See TracBrowser for help on using the browser.