root/trunk/ext/pg_amqp/librabbitmq/amqp.h

Revision 73, 10.5 kB (checked in by jesus, 4 years ago)

exchange declare, lots of fix up and autonomous publishing + comments on functions.

Line 
1 #ifndef librabbitmq_amqp_h
2 #define librabbitmq_amqp_h
3
4 #ifdef __cplusplus
5 extern "C" {
6 #endif
7
8 typedef int amqp_boolean_t;
9 typedef uint32_t amqp_method_number_t;
10 typedef uint32_t amqp_flags_t;
11 typedef uint16_t amqp_channel_t;
12
13 typedef struct amqp_bytes_t_ {
14   size_t len;
15   void *bytes;
16 } amqp_bytes_t;
17
18 #define AMQP_EMPTY_BYTES ((amqp_bytes_t) { .len = 0, .bytes = NULL })
19
20 typedef struct amqp_decimal_t_ {
21   int decimals;
22   uint32_t value;
23 } amqp_decimal_t;
24
25 #define AMQP_DECIMAL(d,v) ((amqp_decimal_t) { .decimals = (d), .value = (v) })
26
27 typedef struct amqp_table_t_ {
28   int num_entries;
29   struct amqp_table_entry_t_ *entries;
30 } amqp_table_t;
31
32 #define AMQP_EMPTY_TABLE ((amqp_table_t) { .num_entries = 0, .entries = NULL })
33
34 typedef struct amqp_table_entry_t_ {
35   amqp_bytes_t key;
36   char kind;
37   union {
38     amqp_bytes_t bytes;
39     int32_t i32;
40     amqp_decimal_t decimal;
41     uint64_t u64;
42     amqp_table_t table;
43   } value;
44 } amqp_table_entry_t;
45
46 #define _AMQP_TE_INIT(ke,ki,v) { .key = (ke), .kind = (ki), .value = { v } }
47 #define AMQP_TABLE_ENTRY_S(k,v) _AMQP_TE_INIT(amqp_cstring_bytes(k), 'S', .bytes = (v))
48 #define AMQP_TABLE_ENTRY_I(k,v) _AMQP_TE_INIT(amqp_cstring_bytes(k), 'I', .i32 = (v))
49 #define AMQP_TABLE_ENTRY_D(k,v) _AMQP_TE_INIT(amqp_cstring_bytes(k), 'D', .decimal = (v))
50 #define AMQP_TABLE_ENTRY_T(k,v) _AMQP_TE_INIT(amqp_cstring_bytes(k), 'T', .u64 = (v))
51 #define AMQP_TABLE_ENTRY_F(k,v) _AMQP_TE_INIT(amqp_cstring_bytes(k), 'F', .table = (v))
52
53 typedef struct amqp_pool_blocklist_t_ {
54   int num_blocks;
55   void **blocklist;
56 } amqp_pool_blocklist_t;
57
58 typedef struct amqp_pool_t_ {
59   size_t pagesize;
60
61   amqp_pool_blocklist_t pages;
62   amqp_pool_blocklist_t large_blocks;
63
64   int next_page;
65   char *alloc_block;
66   size_t alloc_used;
67 } amqp_pool_t;
68
69 typedef struct amqp_method_t_ {
70   amqp_method_number_t id;
71   void *decoded;
72 } amqp_method_t;
73
74 typedef struct amqp_frame_t_ {
75   uint8_t frame_type; /* 0 means no event */
76   amqp_channel_t channel;
77   union {
78     amqp_method_t method;
79     struct {
80       uint16_t class_id;
81       uint64_t body_size;
82       void *decoded;
83     } properties;
84     amqp_bytes_t body_fragment;
85     struct {
86       uint8_t transport_high;
87       uint8_t transport_low;
88       uint8_t protocol_version_major;
89       uint8_t protocol_version_minor;
90     } protocol_header;
91   } payload;
92 } amqp_frame_t;
93
94 typedef enum amqp_response_type_enum_ {
95   AMQP_RESPONSE_NONE = 0,
96   AMQP_RESPONSE_NORMAL,
97   AMQP_RESPONSE_LIBRARY_EXCEPTION,
98   AMQP_RESPONSE_SERVER_EXCEPTION
99 } amqp_response_type_enum;
100
101 typedef struct amqp_rpc_reply_t_ {
102   amqp_response_type_enum reply_type;
103   amqp_method_t reply;
104   int library_errno; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */
105 } amqp_rpc_reply_t;
106
107 typedef enum amqp_sasl_method_enum_ {
108   AMQP_SASL_METHOD_PLAIN = 0
109 } amqp_sasl_method_enum;
110
111 #define AMQP_PSEUDOFRAME_PROTOCOL_HEADER ((uint8_t) 'A')
112 #define AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL ((amqp_channel_t) ((((int) 'M') << 8) | ((int) 'Q')))
113
114 typedef int (*amqp_output_fn_t)(void *context, void *buffer, size_t count);
115
116 /* Opaque struct. */
117 typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
118
119 extern char const *amqp_version(void);
120
121 extern void init_amqp_pool(amqp_pool_t *pool, size_t pagesize);
122 extern void recycle_amqp_pool(amqp_pool_t *pool);
123 extern void empty_amqp_pool(amqp_pool_t *pool);
124
125 extern void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount);
126 extern void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output);
127
128 extern amqp_bytes_t amqp_cstring_bytes(char const *cstr);
129 extern amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src);
130
131 #define AMQP_BYTES_FREE(b)                      \
132   ({                                            \
133     if ((b).bytes != NULL) {                    \
134       free((b).bytes);                          \
135       (b).bytes = NULL;                         \
136     }                                           \
137   })
138
139 extern amqp_connection_state_t amqp_new_connection(void);
140 extern int amqp_get_sockfd(amqp_connection_state_t state);
141 extern void amqp_set_sockfd(amqp_connection_state_t state,
142                             int sockfd);
143 extern int amqp_tune_connection(amqp_connection_state_t state,
144                                 int channel_max,
145                                 int frame_max,
146                                 int heartbeat);
147 int amqp_get_channel_max(amqp_connection_state_t state);
148 extern void amqp_destroy_connection(amqp_connection_state_t state);
149
150 extern int amqp_handle_input(amqp_connection_state_t state,
151                              amqp_bytes_t received_data,
152                              amqp_frame_t *decoded_frame);
153
154 extern amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state);
155
156 extern void amqp_release_buffers(amqp_connection_state_t state);
157
158 extern void amqp_maybe_release_buffers(amqp_connection_state_t state);
159
160 extern int amqp_send_frame(amqp_connection_state_t state,
161                            amqp_frame_t const *frame);
162 extern int amqp_send_frame_to(amqp_connection_state_t state,
163                               amqp_frame_t const *frame,
164                               amqp_output_fn_t fn,
165                               void *context);
166
167 extern int amqp_table_entry_cmp(void const *entry1, void const *entry2);
168
169 extern int amqp_open_socket(char const *hostname, int portnumber);
170
171 extern int amqp_send_header(amqp_connection_state_t state);
172 extern int amqp_send_header_to(amqp_connection_state_t state,
173                                amqp_output_fn_t fn,
174                                void *context);
175
176 extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state);
177
178 extern int amqp_simple_wait_frame(amqp_connection_state_t state,
179                                   amqp_frame_t *decoded_frame);
180
181 extern int amqp_simple_wait_method(amqp_connection_state_t state,
182                                    amqp_channel_t expected_channel,
183                                    amqp_method_number_t expected_method,
184                                    amqp_method_t *output);
185
186 extern int amqp_send_method(amqp_connection_state_t state,
187                             amqp_channel_t channel,
188                             amqp_method_number_t id,
189                             void *decoded);
190
191 extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
192                                         amqp_channel_t channel,
193                                         amqp_method_number_t request_id,
194                                         amqp_method_number_t *expected_reply_ids,
195                                         void *decoded_request_method);
196
197 #define AMQP_EXPAND_METHOD(classname, methodname) (AMQP_ ## classname ## _ ## methodname ## _METHOD)
198
199 #define AMQP_SIMPLE_RPC(state, channel, classname, requestname, replyname, structname, ...) \
200   ({                                                                    \
201     structname _simple_rpc_request___ = (structname) { __VA_ARGS__ };   \
202     amqp_method_number_t _replies__[2] = { AMQP_EXPAND_METHOD(classname, replyname), 0}; \
203     amqp_simple_rpc(state, channel,                                     \
204                     AMQP_EXPAND_METHOD(classname, requestname), \
205                     (amqp_method_number_t *)&_replies__,        \
206                     &_simple_rpc_request___);                           \
207   })
208
209 #define AMQP_MULTIPLE_RESPONSE_RPC(state, channel, classname, requestname, replynames, structname, ...) \
210   ({                                                                    \
211     structname _simple_rpc_request___ = (structname) { __VA_ARGS__ };   \
212     amqp_simple_rpc(state, channel,                                     \
213                     AMQP_EXPAND_METHOD(classname, requestname), \
214                     replynames, \
215                     &_simple_rpc_request___);                           \
216   })
217
218
219 extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
220                                    char const *vhost,
221                                    int channel_max,
222                                    int frame_max,
223                                    int heartbeat,
224                                    amqp_sasl_method_enum sasl_method, ...);
225
226 extern struct amqp_channel_open_ok_t_ *amqp_channel_open(amqp_connection_state_t state,
227                                                          amqp_channel_t channel);
228
229 struct amqp_basic_properties_t_;
230 extern int amqp_basic_publish(amqp_connection_state_t state,
231                               amqp_channel_t channel,
232                               amqp_bytes_t exchange,
233                               amqp_bytes_t routing_key,
234                               amqp_boolean_t mandatory,
235                               amqp_boolean_t immediate,
236                               struct amqp_basic_properties_t_ const *properties,
237                               amqp_bytes_t body);
238
239 extern amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
240                                            amqp_channel_t channel,
241                                            int code);
242 extern amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
243                                               int code);
244
245 extern struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(amqp_connection_state_t state,
246                                                                  amqp_channel_t channel,
247                                                                  amqp_bytes_t exchange,
248                                                                  amqp_bytes_t type,
249                                                                  amqp_boolean_t passive,
250                                                                  amqp_boolean_t durable,
251                                                                  amqp_boolean_t auto_delete,
252                                                                  amqp_table_t arguments);
253
254 extern struct amqp_queue_declare_ok_t_ *amqp_queue_declare(amqp_connection_state_t state,
255                                                            amqp_channel_t channel,
256                                                            amqp_bytes_t queue,
257                                                            amqp_boolean_t passive,
258                                                            amqp_boolean_t durable,
259                                                            amqp_boolean_t exclusive,
260                                                            amqp_boolean_t auto_delete,
261                                                            amqp_table_t arguments);
262
263 extern struct amqp_queue_bind_ok_t_ *amqp_queue_bind(amqp_connection_state_t state,
264                                                      amqp_channel_t channel,
265                                                      amqp_bytes_t queue,
266                                                      amqp_bytes_t exchange,
267                                                      amqp_bytes_t routing_key,
268                                                      amqp_table_t arguments);
269
270 extern struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind(amqp_connection_state_t state,
271                                                          amqp_channel_t channel,
272                                                          amqp_bytes_t queue,
273                                                          amqp_bytes_t exchange,
274                                                          amqp_bytes_t binding_key,
275                                                          amqp_table_t arguments);
276
277 extern struct amqp_basic_consume_ok_t_ *amqp_basic_consume(amqp_connection_state_t state,
278                                                            amqp_channel_t channel,
279                                                            amqp_bytes_t queue,
280                                                            amqp_bytes_t consumer_tag,
281                                                            amqp_boolean_t no_local,
282                                                            amqp_boolean_t no_ack,
283                                                            amqp_boolean_t exclusive);
284
285 extern int amqp_basic_ack(amqp_connection_state_t state,
286                           amqp_channel_t channel,
287                           uint64_t delivery_tag,
288                           amqp_boolean_t multiple);
289
290 extern amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
291           amqp_channel_t channel,
292           amqp_bytes_t queue,
293           amqp_boolean_t no_ack);
294
295 extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t state,
296             amqp_channel_t channel,
297             amqp_bytes_t queue,
298             amqp_boolean_t no_wait);
299
300 extern struct amqp_tx_select_ok_t_ *amqp_tx_select(amqp_connection_state_t state,
301             amqp_channel_t channel,
302             amqp_table_t arguments);
303
304 extern struct amqp_tx_commit_ok_t_ *amqp_tx_commit(amqp_connection_state_t state,
305             amqp_channel_t channel,
306             amqp_table_t arguments);
307
308 extern struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback(amqp_connection_state_t state,
309             amqp_channel_t channel,
310             amqp_table_t arguments);
311
312 extern struct amqp_basic_return_t_ *amqp_basic_return(amqp_connection_state_t state,
313             amqp_channel_t channel,
314             amqp_table_t arguments);
315
316 /*
317  * Can be used to see if there is data still in the buffer, if so
318  * calling amqp_simple_wait_frame will not immediately enter a
319  * blocking read.
320  *
321  * Possibly amqp_frames_enqueued should be used for this?
322  */
323 extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
324
325 /*
326  * Expose amqp_rpc_reply to libraries.
327  */
328 extern amqp_rpc_reply_t *amqp_get_rpc_reply(void);
329
330 #ifdef __cplusplus
331 }
332 #endif
333
334 #endif
Note: See TracBrowser for help on using the browser.