root/perl/JLog.xs

Revision b6b39fc7383c59824010dffec2e64aa60a11a4b8, 9.8 kB (checked in by George Schlossnagle <george@omniti.com>, 5 years ago)

add function to advance and id. This allows for advancement
outside of one's current segment.

add support for this to the perl api

  • Property mode set to 100644
Line 
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include "ppport.h"
6 #include "jlog.h"
7
8 typedef struct {
9   jlog_ctx *ctx;
10   char *path;
11   jlog_id start;
12   jlog_id last;
13   jlog_id prev;
14   jlog_id end;
15   int auto_checkpoint;
16 } jlog_obj;
17
18 typedef jlog_obj * JLog;
19 typedef jlog_obj * JLog_Writer;
20 typedef jlog_obj * JLog_Reader;
21
22 #define FREE_JLOG_OBJ(my_obj) do { \
23   if(my_obj->ctx) { \
24     jlog_ctx_close(my_obj->ctx); \
25   } \
26   if(my_obj->path){ \
27     free(my_obj->path); \
28   } \
29   free(my_obj); \
30 } while(0)
31
32 #define SYS_CROAK(message) do { \
33   croak(message "; error: %d (%s) errno: %d (%s)", \
34     jlog_ctx_err(my_obj->ctx), jlog_ctx_err_string(my_obj->ctx), \
35     jlog_ctx_errno(my_obj->ctx), strerror(jlog_ctx_errno(my_obj->ctx))); \
36 } while (0)
37
38 MODULE = JLog           PACKAGE = JLog PREFIX=JLOG_
39
40 SV *JLOG_new(classname, path, ...)
41   char *classname;
42   char *path;
43   CODE:
44     {
45       jlog_obj *my_obj;
46       int options = O_CREAT;
47       size_t size = 0;
48       my_obj = calloc(1, sizeof(*my_obj));
49       my_obj->ctx = jlog_new(path);
50       my_obj->path = strdup(path);
51       if(items > 2) {
52         options = SvIV(ST(2));
53         if(items > 3) {
54           size = SvIV(ST(3));
55         }
56       }
57
58       if(!my_obj->ctx) {
59         FREE_JLOG_OBJ(my_obj);
60         croak("jlog_new(%s) failed", path);
61       }
62       if(options & O_CREAT) {
63         if(size) {
64           jlog_ctx_alter_journal_size(my_obj->ctx, size);
65         }
66         if(jlog_ctx_init(my_obj->ctx) != 0) {
67           if(jlog_ctx_err(my_obj->ctx) == JLOG_ERR_CREATE_EXISTS) {
68             if(options & O_EXCL) {
69               FREE_JLOG_OBJ(my_obj);
70               croak("file already exists: %s", path);
71             }
72           } else {
73             int err = jlog_ctx_err(my_obj->ctx);
74             const char *err_string = jlog_ctx_err_string(my_obj->ctx);
75             FREE_JLOG_OBJ(my_obj);
76             croak("error initializing jlog: %d %s", err, err_string);
77           }
78         }
79         jlog_ctx_close(my_obj->ctx);
80         my_obj->ctx = jlog_new(path);
81         if(!my_obj->ctx) {
82           FREE_JLOG_OBJ(my_obj);
83           croak("jlog_new(%s) failed after successful init", path);
84         }
85       }
86       RETVAL = newSV(0);
87       sv_setref_pv(RETVAL, classname, (void *)my_obj);
88     }
89   OUTPUT:
90     RETVAL
91
92 SV *JLOG_JLOG_BEGIN()
93   CODE:
94     {
95       RETVAL = newSViv(JLOG_BEGIN);
96     }
97   OUTPUT:
98     RETVAL
99    
100 SV *JLOG_JLOG_END()
101   CODE:
102     {
103       RETVAL = newSViv(JLOG_END);
104     }
105   OUTPUT:
106     RETVAL
107
108
109 SV *JLOG_add_subscriber(my_obj, subscriber, ...)
110   JLog my_obj;
111   char *subscriber;
112   CODE:
113     {
114       int whence = JLOG_BEGIN;
115       if(items > 2) {
116         whence = SvIV(ST(2));
117       }
118       if(!my_obj || !my_obj->ctx ||
119          jlog_ctx_add_subscriber(my_obj->ctx, subscriber, whence) != 0)
120       {
121         RETVAL = &PL_sv_no;
122       } else {
123         RETVAL = &PL_sv_yes;
124       }
125     }
126   OUTPUT:
127     RETVAL
128
129 SV *JLOG_remove_subscriber(my_obj, subscriber)
130   JLog my_obj;
131   char *subscriber;
132   CODE:
133     {
134       if(!my_obj || !my_obj->ctx ||
135          jlog_ctx_remove_subscriber(my_obj->ctx, subscriber) != 0)
136       {
137         RETVAL = &PL_sv_no;
138       } else {
139         RETVAL = &PL_sv_yes;
140       }
141     }
142   OUTPUT:
143     RETVAL
144
145 void JLOG_list_subscribers(my_obj)
146   JLog my_obj;
147   PPCODE:
148     {
149       char **list;
150       int i;
151       if(!my_obj || !my_obj->ctx) {
152         croak("invalid jlog context");
153       }
154       jlog_ctx_list_subscribers(my_obj->ctx, &list);
155       for(i=0; list[i]; i++) {
156         XPUSHs(sv_2mortal(newSVpv(list[i], 0)));
157       }
158       jlog_ctx_list_subscribers_dispose(my_obj->ctx, list);
159     }
160
161 SV *JLOG_alter_journal_size(my_obj, size)
162   JLog my_obj;
163   size_t size;
164   CODE:
165     {
166       if(!my_obj || !my_obj->ctx) {
167         croak("invalid jlog context");
168       }
169       /* calling jlog_ctx_alter_journal_size here will never have any
170        * effect, it's either too late or too early. Make this return
171        * failure and deprecate it */
172       RETVAL = &PL_sv_no;
173     }
174   OUTPUT:
175     RETVAL
176
177 SV *JLOG_raw_size(my_obj)
178   JLog my_obj;
179   CODE:
180     {
181       size_t size;
182       if(!my_obj || !my_obj->ctx) {
183         croak("invalid jlog context");
184       }
185       size = jlog_raw_size(my_obj->ctx);
186       RETVAL = newSViv(size);
187     }
188   OUTPUT:
189     RETVAL
190  
191 void JLOG_close(my_obj)
192   JLog my_obj;
193   CODE:
194     {
195       if(!my_obj || !my_obj->ctx) { return; }
196       jlog_ctx_close(my_obj->ctx);
197       my_obj->ctx = NULL;
198     }
199
200 SV* JLOG_inspect(my_obj)
201   JLog my_obj;
202   CODE:
203     {
204       HV *rh;
205       char start[20], last[20], prev[20], end[20];
206       rh = (HV *)sv_2mortal((SV *)newHV());
207       jlog_snprint_logid(start, sizeof(start), &my_obj->start);
208       hv_store(rh, "start", sizeof("start") - 1, newSVpv(start, 0), 0);
209
210       jlog_snprint_logid(last, sizeof(last), &my_obj->last);
211       hv_store(rh, "last", sizeof("last") - 1, newSVpv(last, 0), 0);
212
213       jlog_snprint_logid(prev, sizeof(prev), &my_obj->prev);
214       hv_store(rh, "prev", sizeof("prev") - 1, newSVpv(prev, 0), 0);
215
216       jlog_snprint_logid(end, sizeof(end), &my_obj->end);
217       hv_store(rh, "end", sizeof("end") - 1, newSVpv(end, 0), 0);
218
219       hv_store(rh, "path", sizeof("path") - 1, newSVpv(my_obj->path, 0), 0);
220       RETVAL = newRV((SV *)rh);
221     }
222   OUTPUT:
223     RETVAL
224
225 void JLOG_DESTROY(my_obj)
226   JLog my_obj;
227   CODE:
228     {
229       if(!my_obj) return;
230       FREE_JLOG_OBJ(my_obj);
231     }
232
233
234 MODULE = JLog PACKAGE = JLog::Writer PREFIX=JLOG_W_
235
236
237 SV *JLOG_W_open(my_obj)
238   JLog_Writer my_obj;
239   CODE:
240     {
241       if(!my_obj || !my_obj->ctx) {
242         croak("invalid jlog context");
243       }
244       if(jlog_ctx_open_writer(my_obj->ctx) != 0) {
245         SYS_CROAK("jlog_ctx_open_writer failed");
246       } else {
247         RETVAL = newSVsv(ST(0));
248       }
249     }
250   OUTPUT:
251     RETVAL
252
253 SV *JLOG_W_write(my_obj, buffer_sv, ...)
254   JLog_Writer my_obj;
255   SV *buffer_sv;
256   CODE:
257     {
258       char *buffer;
259       int ts = 0;
260       jlog_message m;
261       struct timeval t;
262       STRLEN buffer_len;
263
264       if(!my_obj || !my_obj->ctx) {
265         croak("invalid jlog context");
266       }
267       if(items > 2) {
268         ts = (time_t) SvIV(ST(2));
269       }
270
271       buffer = SvPVx(buffer_sv, buffer_len);
272       m.mess = buffer;
273       m.mess_len = buffer_len;
274       t.tv_sec = ts;
275       t.tv_usec = 0;
276
277       if(jlog_ctx_write_message(my_obj->ctx, &m, ts?&t:NULL) < 0) {
278         RETVAL = &PL_sv_no;
279       } else {
280         RETVAL = &PL_sv_yes;
281       }
282     }
283   OUTPUT:
284     RETVAL
285
286
287 MODULE = JLog PACKAGE = JLog::Reader PREFIX=JLOG_R_
288
289
290 SV *JLOG_R_open(my_obj, subscriber)
291   JLog_Reader my_obj;
292   char *subscriber;
293   CODE:
294     {
295       if(!my_obj || !my_obj->ctx) {
296         croak("invalid jlog context");
297       }
298       if(jlog_ctx_open_reader(my_obj->ctx, subscriber) != 0) {
299         SYS_CROAK("jlog_ctx_open_reader failed");
300       } else {
301         RETVAL = newSVsv(ST(0));
302       }
303     }
304   OUTPUT:
305     RETVAL
306
307 SV * JLOG_R_read(my_obj)
308   JLog_Reader my_obj;
309   CODE:
310     {
311       const jlog_id epoch = { 0, 0 };
312       jlog_id cur;
313       jlog_message message;
314       int cnt;
315       if(!my_obj || !my_obj->ctx) {
316         croak("invalid jlog context");
317       }
318       /* if start is unset, we need to read the interval (again) */
319       if(!memcmp(&my_obj->start, &epoch, sizeof(jlog_id)))
320       {
321         cnt = jlog_ctx_read_interval(my_obj->ctx, &my_obj->start, &my_obj->end);
322         if(cnt == -1) SYS_CROAK("jlog_ctx_read_interval failed");
323         if(cnt == 0) {
324           my_obj->start = epoch;
325           my_obj->end = epoch;
326           RETVAL = &PL_sv_undef;
327           goto end;
328         }
329       }
330       /* if last is unset, start at the beginning */
331       if(!memcmp(&my_obj->last, &epoch, sizeof(jlog_id))) {
332         cur = my_obj->start;
333       } else {
334         /* if we've already read the end, return; otherwise advance */
335         cur = my_obj->last;
336         if(!memcmp(&my_obj->prev, &my_obj->end, sizeof(jlog_id))) {
337           my_obj->start = epoch;
338           my_obj->end = epoch;
339           RETVAL = &PL_sv_undef;
340           goto end;
341         }
342         jlog_ctx_advance_id(my_obj->ctx, &my_obj->last, &cur, &my_obj->end);
343       }
344
345       if(jlog_ctx_read_message(my_obj->ctx, &cur, &message) != 0) {
346         /* read failed; croak, but recover if the read is retried */
347         my_obj->start = epoch;
348         my_obj->last = epoch;
349         my_obj->end = epoch;
350         SYS_CROAK("read failed");
351       }
352       if(my_obj->auto_checkpoint) {
353         if(jlog_ctx_read_checkpoint(my_obj->ctx, &cur) != 0)
354           SYS_CROAK("checkpoint failed");
355         /* we have to re-read the interval after a checkpoint */
356         my_obj->last = epoch;
357         my_obj->start = epoch;
358         my_obj->end = epoch;
359       } else {
360         /* update last */
361         my_obj->prev = my_obj->last;
362         my_obj->last = cur;
363         /* if we've reaached the end, clear interval so we'll re-read it */
364       }
365       RETVAL = newSVpv(message.mess, message.mess_len);
366 end:
367       ;
368     }
369   OUTPUT:
370     RETVAL
371
372 SV *JLOG_R_rewind(my_obj)
373   JLog_Reader my_obj;
374   CODE:
375     {
376       if(!my_obj || !my_obj->ctx) {
377         croak("invalid jlog context");
378       }
379       my_obj->last = my_obj->prev;
380       RETVAL = newSVsv(ST(0));
381     }
382   OUTPUT:
383     RETVAL
384
385 SV *JLOG_R_checkpoint(my_obj)
386   JLog_Reader my_obj;
387   CODE:
388     {
389       jlog_id epoch = { 0, 0 };
390       if(!my_obj || !my_obj->ctx) {
391         croak("invalid jlog context");
392       }
393       if(memcmp(&my_obj->last, &epoch, sizeof(jlog_id)))
394       {
395         jlog_ctx_read_checkpoint(my_obj->ctx, &my_obj->last);
396         /* we have to re-read the interval after a checkpoint */
397         my_obj->last = epoch;
398         my_obj->start = epoch;
399         my_obj->end = epoch;
400       }
401       RETVAL = newSVsv(ST(0));
402     }
403   OUTPUT:
404     RETVAL
405
406 SV *JLOG_R_auto_checkpoint(my_obj, ...)
407   JLog_Reader my_obj;
408   CODE:
409     {
410       if(!my_obj || !my_obj->ctx) {
411         croak("invalid jlog context");
412       }
413       if(items > 1) {
414         int ac = SvIV(ST(1));
415         my_obj->auto_checkpoint = ac;
416       }
417       RETVAL = newSViv(my_obj->auto_checkpoint);
418     }
419   OUTPUT:
420     RETVAL
Note: See TracBrowser for help on using the browser.