root/perl/JLog.xs

Revision 940d6415d15895c83800d28cce0195fda0cc6b86, 10.2 kB (checked in by George Schlossnagle <george@omniti.com>, 5 years ago)

I'm liking this better. Maintain our current position if we et this sort of error,
we'll fall into the error case in the next read() and all will be good.

  • Property mode set to 100644
Line 
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include "ppport.h"
6 #include "jlog.h"
7
8 typedef struct {
9   jlog_ctx *ctx;
10   char *path;
11   jlog_id start;
12   jlog_id last;
13   jlog_id prev;
14   jlog_id end;
15   int auto_checkpoint;
16   int error;
17 } jlog_obj;
18
19 typedef jlog_obj * JLog;
20 typedef jlog_obj * JLog_Writer;
21 typedef jlog_obj * JLog_Reader;
22
23 #define FREE_JLOG_OBJ(my_obj) do { \
24   if(my_obj->ctx) { \
25     jlog_ctx_close(my_obj->ctx); \
26   } \
27   if(my_obj->path){ \
28     free(my_obj->path); \
29   } \
30   free(my_obj); \
31 } while(0)
32
33 #define SYS_CROAK(message) do { \
34   croak(message "; error: %d (%s) errno: %d (%s)", \
35     jlog_ctx_err(my_obj->ctx), jlog_ctx_err_string(my_obj->ctx), \
36     jlog_ctx_errno(my_obj->ctx), strerror(jlog_ctx_errno(my_obj->ctx))); \
37 } while (0)
38
39 MODULE = JLog           PACKAGE = JLog PREFIX=JLOG_
40
41 SV *JLOG_new(classname, path, ...)
42   char *classname;
43   char *path;
44   CODE:
45     {
46       jlog_obj *my_obj;
47       int options = O_CREAT;
48       size_t size = 0;
49       my_obj = calloc(1, sizeof(*my_obj));
50       my_obj->ctx = jlog_new(path);
51       my_obj->path = strdup(path);
52       if(items > 2) {
53         options = SvIV(ST(2));
54         if(items > 3) {
55           size = SvIV(ST(3));
56         }
57       }
58
59       if(!my_obj->ctx) {
60         FREE_JLOG_OBJ(my_obj);
61         croak("jlog_new(%s) failed", path);
62       }
63       if(options & O_CREAT) {
64         if(size) {
65           jlog_ctx_alter_journal_size(my_obj->ctx, size);
66         }
67         if(jlog_ctx_init(my_obj->ctx) != 0) {
68           if(jlog_ctx_err(my_obj->ctx) == JLOG_ERR_CREATE_EXISTS) {
69             if(options & O_EXCL) {
70               FREE_JLOG_OBJ(my_obj);
71               croak("file already exists: %s", path);
72             }
73           } else {
74             int err = jlog_ctx_err(my_obj->ctx);
75             const char *err_string = jlog_ctx_err_string(my_obj->ctx);
76             FREE_JLOG_OBJ(my_obj);
77             croak("error initializing jlog: %d %s", err, err_string);
78           }
79         }
80         jlog_ctx_close(my_obj->ctx);
81         my_obj->ctx = jlog_new(path);
82         if(!my_obj->ctx) {
83           FREE_JLOG_OBJ(my_obj);
84           croak("jlog_new(%s) failed after successful init", path);
85         }
86       }
87       RETVAL = newSV(0);
88       sv_setref_pv(RETVAL, classname, (void *)my_obj);
89     }
90   OUTPUT:
91     RETVAL
92
93 SV *JLOG_JLOG_BEGIN()
94   CODE:
95     {
96       RETVAL = newSViv(JLOG_BEGIN);
97     }
98   OUTPUT:
99     RETVAL
100    
101 SV *JLOG_JLOG_END()
102   CODE:
103     {
104       RETVAL = newSViv(JLOG_END);
105     }
106   OUTPUT:
107     RETVAL
108
109
110 SV *JLOG_add_subscriber(my_obj, subscriber, ...)
111   JLog my_obj;
112   char *subscriber;
113   CODE:
114     {
115       int whence = JLOG_BEGIN;
116       if(items > 2) {
117         whence = SvIV(ST(2));
118       }
119       if(!my_obj || !my_obj->ctx ||
120          jlog_ctx_add_subscriber(my_obj->ctx, subscriber, whence) != 0)
121       {
122         RETVAL = &PL_sv_no;
123       } else {
124         RETVAL = &PL_sv_yes;
125       }
126     }
127   OUTPUT:
128     RETVAL
129
130 SV *JLOG_remove_subscriber(my_obj, subscriber)
131   JLog my_obj;
132   char *subscriber;
133   CODE:
134     {
135       if(!my_obj || !my_obj->ctx ||
136          jlog_ctx_remove_subscriber(my_obj->ctx, subscriber) != 0)
137       {
138         RETVAL = &PL_sv_no;
139       } else {
140         RETVAL = &PL_sv_yes;
141       }
142     }
143   OUTPUT:
144     RETVAL
145
146 void JLOG_list_subscribers(my_obj)
147   JLog my_obj;
148   PPCODE:
149     {
150       char **list;
151       int i;
152       if(!my_obj || !my_obj->ctx) {
153         croak("invalid jlog context");
154       }
155       jlog_ctx_list_subscribers(my_obj->ctx, &list);
156       for(i=0; list[i]; i++) {
157         XPUSHs(sv_2mortal(newSVpv(list[i], 0)));
158       }
159       jlog_ctx_list_subscribers_dispose(my_obj->ctx, list);
160     }
161
162 SV *JLOG_alter_journal_size(my_obj, size)
163   JLog my_obj;
164   size_t size;
165   CODE:
166     {
167       if(!my_obj || !my_obj->ctx) {
168         croak("invalid jlog context");
169       }
170       /* calling jlog_ctx_alter_journal_size here will never have any
171        * effect, it's either too late or too early. Make this return
172        * failure and deprecate it */
173       RETVAL = &PL_sv_no;
174     }
175   OUTPUT:
176     RETVAL
177
178 SV *JLOG_raw_size(my_obj)
179   JLog my_obj;
180   CODE:
181     {
182       size_t size;
183       if(!my_obj || !my_obj->ctx) {
184         croak("invalid jlog context");
185       }
186       size = jlog_raw_size(my_obj->ctx);
187       RETVAL = newSViv(size);
188     }
189   OUTPUT:
190     RETVAL
191  
192 void JLOG_close(my_obj)
193   JLog my_obj;
194   CODE:
195     {
196       if(!my_obj || !my_obj->ctx) { return; }
197       jlog_ctx_close(my_obj->ctx);
198       my_obj->ctx = NULL;
199     }
200
201 SV* JLOG_inspect(my_obj)
202   JLog my_obj;
203   CODE:
204     {
205       HV *rh;
206       char start[20], last[20], prev[20], end[20];
207       rh = (HV *)sv_2mortal((SV *)newHV());
208       jlog_snprint_logid(start, sizeof(start), &my_obj->start);
209       hv_store(rh, "start", sizeof("start") - 1, newSVpv(start, 0), 0);
210
211       jlog_snprint_logid(last, sizeof(last), &my_obj->last);
212       hv_store(rh, "last", sizeof("last") - 1, newSVpv(last, 0), 0);
213
214       jlog_snprint_logid(prev, sizeof(prev), &my_obj->prev);
215       hv_store(rh, "prev", sizeof("prev") - 1, newSVpv(prev, 0), 0);
216
217       jlog_snprint_logid(end, sizeof(end), &my_obj->end);
218       hv_store(rh, "end", sizeof("end") - 1, newSVpv(end, 0), 0);
219
220       hv_store(rh, "path", sizeof("path") - 1, newSVpv(my_obj->path, 0), 0);
221       RETVAL = newRV((SV *)rh);
222     }
223   OUTPUT:
224     RETVAL
225
226 void JLOG_DESTROY(my_obj)
227   JLog my_obj;
228   CODE:
229     {
230       if(!my_obj) return;
231       FREE_JLOG_OBJ(my_obj);
232     }
233
234
235 MODULE = JLog PACKAGE = JLog::Writer PREFIX=JLOG_W_
236
237
238 SV *JLOG_W_open(my_obj)
239   JLog_Writer my_obj;
240   CODE:
241     {
242       if(!my_obj || !my_obj->ctx) {
243         croak("invalid jlog context");
244       }
245       if(jlog_ctx_open_writer(my_obj->ctx) != 0) {
246         SYS_CROAK("jlog_ctx_open_writer failed");
247       } else {
248         RETVAL = newSVsv(ST(0));
249       }
250     }
251   OUTPUT:
252     RETVAL
253
254 SV *JLOG_W_write(my_obj, buffer_sv, ...)
255   JLog_Writer my_obj;
256   SV *buffer_sv;
257   CODE:
258     {
259       char *buffer;
260       int ts = 0;
261       jlog_message m;
262       struct timeval t;
263       STRLEN buffer_len;
264
265       if(!my_obj || !my_obj->ctx) {
266         croak("invalid jlog context");
267       }
268       if(items > 2) {
269         ts = (time_t) SvIV(ST(2));
270       }
271
272       buffer = SvPVx(buffer_sv, buffer_len);
273       m.mess = buffer;
274       m.mess_len = buffer_len;
275       t.tv_sec = ts;
276       t.tv_usec = 0;
277
278       if(jlog_ctx_write_message(my_obj->ctx, &m, ts?&t:NULL) < 0) {
279         RETVAL = &PL_sv_no;
280       } else {
281         RETVAL = &PL_sv_yes;
282       }
283     }
284   OUTPUT:
285     RETVAL
286
287
288 MODULE = JLog PACKAGE = JLog::Reader PREFIX=JLOG_R_
289
290
291 SV *JLOG_R_open(my_obj, subscriber)
292   JLog_Reader my_obj;
293   char *subscriber;
294   CODE:
295     {
296       if(!my_obj || !my_obj->ctx) {
297         croak("invalid jlog context");
298       }
299       if(jlog_ctx_open_reader(my_obj->ctx, subscriber) != 0) {
300         SYS_CROAK("jlog_ctx_open_reader failed");
301       } else {
302         RETVAL = newSVsv(ST(0));
303       }
304     }
305   OUTPUT:
306     RETVAL
307
308 SV * JLOG_R_read(my_obj)
309   JLog_Reader my_obj;
310   CODE:
311     {
312       const jlog_id epoch = { 0, 0 };
313       jlog_id cur;
314       jlog_message message;
315       int cnt;
316       if(!my_obj || !my_obj->ctx) {
317         croak("invalid jlog context");
318       }
319       /* if start is unset, we need to read the interval (again) */
320       if(my_obj->error || !memcmp(&my_obj->start, &epoch, sizeof(jlog_id)))
321       {
322         my_obj->error = 0;
323         cnt = jlog_ctx_read_interval(my_obj->ctx, &my_obj->start, &my_obj->end);
324         if(cnt == 0 || (cnt == -1 && jlog_ctx_err(my_obj->ctx) == JLOG_ERR_FILE_OPEN)) {
325           my_obj->start = epoch;
326           my_obj->end = epoch;
327           RETVAL = &PL_sv_undef;
328           goto end;
329         }
330         else if(cnt == -1) SYS_CROAK("jlog_ctx_read_interval failed");
331       }
332       /* if last is unset, start at the beginning */
333       if(!memcmp(&my_obj->last, &epoch, sizeof(jlog_id))) {
334         cur = my_obj->start;
335       } else {
336         /* if we've already read the end, return; otherwise advance */
337         cur = my_obj->last;
338         if(!memcmp(&my_obj->prev, &my_obj->end, sizeof(jlog_id))) {
339           my_obj->start = epoch;
340           my_obj->end = epoch;
341           RETVAL = &PL_sv_undef;
342           goto end;
343         }
344         jlog_ctx_advance_id(my_obj->ctx, &my_obj->last, &cur, &my_obj->end);
345         if(!memcmp(&my_obj->last, &cur, sizeof(jlog_id))) {
346           my_obj->start = epoch;
347           my_obj->end = epoch;
348           RETVAL = &PL_sv_undef;
349           goto end;
350         }
351       }
352       if(jlog_ctx_read_message(my_obj->ctx, &cur, &message) != 0) {
353         if(jlog_ctx_err(my_obj->ctx) == JLOG_ERR_FILE_OPEN) {
354           my_obj->error = 1;
355           RETVAL = &PL_sv_undef;
356           goto end;
357         }
358         /* read failed; croak, but recover if the read is retried */
359         my_obj->error = 1;
360         SYS_CROAK("read failed");
361       }
362       if(my_obj->auto_checkpoint) {
363         if(jlog_ctx_read_checkpoint(my_obj->ctx, &cur) != 0)
364           SYS_CROAK("checkpoint failed");
365         /* we have to re-read the interval after a checkpoint */
366         my_obj->last = epoch;
367         my_obj->prev = epoch;
368         my_obj->start = epoch;
369         my_obj->end = epoch;
370       } else {
371         /* update last */
372         my_obj->prev = my_obj->last;
373         my_obj->last = cur;
374         /* if we've reaached the end, clear interval so we'll re-read it */
375       }
376       RETVAL = newSVpv(message.mess, message.mess_len);
377 end:
378       ;
379     }
380   OUTPUT:
381     RETVAL
382
383 SV *JLOG_R_rewind(my_obj)
384   JLog_Reader my_obj;
385   CODE:
386     {
387       if(!my_obj || !my_obj->ctx) {
388         croak("invalid jlog context");
389       }
390       my_obj->last = my_obj->prev;
391       RETVAL = newSVsv(ST(0));
392     }
393   OUTPUT:
394     RETVAL
395
396 SV *JLOG_R_checkpoint(my_obj)
397   JLog_Reader my_obj;
398   CODE:
399     {
400       jlog_id epoch = { 0, 0 };
401       if(!my_obj || !my_obj->ctx) {
402         croak("invalid jlog context");
403       }
404       if(memcmp(&my_obj->last, &epoch, sizeof(jlog_id)))
405       {
406         jlog_ctx_read_checkpoint(my_obj->ctx, &my_obj->last);
407         /* we have to re-read the interval after a checkpoint */
408         my_obj->last = epoch;
409         my_obj->start = epoch;
410         my_obj->end = epoch;
411       }
412       RETVAL = newSVsv(ST(0));
413     }
414   OUTPUT:
415     RETVAL
416
417 SV *JLOG_R_auto_checkpoint(my_obj, ...)
418   JLog_Reader my_obj;
419   CODE:
420     {
421       if(!my_obj || !my_obj->ctx) {
422         croak("invalid jlog context");
423       }
424       if(items > 1) {
425         int ac = SvIV(ST(1));
426         my_obj->auto_checkpoint = ac;
427       }
428       RETVAL = newSViv(my_obj->auto_checkpoint);
429     }
430   OUTPUT:
431     RETVAL
Note: See TracBrowser for help on using the browser.