root/perl/JLog.xs

Revision 5b33ecc14d5b2a8f633fd8b6427085395d985fa0, 8.8 kB (checked in by George Schlossnagle <george@omniti.com>, 6 years ago)

Allow write() to take an optional timestamp to make
useof the jlog_ctx_write_message() API.

  • Property mode set to 100644
Line 
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include "ppport.h"
6 #include "jlog.h"
7
8 typedef struct {
9   jlog_ctx *ctx;
10   char *path;
11   jlog_id start;
12   jlog_id last;
13   jlog_id end;
14   int auto_checkpoint;
15 } jlog_obj;
16
17 typedef jlog_obj * JLog;
18 typedef jlog_obj * JLog_Writer;
19 typedef jlog_obj * JLog_Reader;
20
21 #define FREE_JLOG_OBJ(my_obj) do { \
22   if(my_obj->ctx) { \
23     jlog_ctx_close(my_obj->ctx); \
24   } \
25   if(my_obj->path){ \
26     free(my_obj->path); \
27   } \
28   free(my_obj); \
29 } while(0)
30
31 #define SYS_CROAK(message) do { \
32   croak(message "; error: %d (%s) errno: %d (%s)", \
33     jlog_ctx_err(my_obj->ctx), jlog_ctx_err_string(my_obj->ctx), \
34     jlog_ctx_errno(my_obj->ctx), strerror(jlog_ctx_errno(my_obj->ctx))); \
35 } while (0)
36
37 MODULE = JLog           PACKAGE = JLog PREFIX=JLOG_
38
39 SV *JLOG_new(classname, path, ...)
40   char *classname;
41   char *path;
42   CODE:
43     {
44       jlog_obj *my_obj;
45       int options = O_CREAT;
46       size_t size = 0;
47       my_obj = calloc(1, sizeof(*my_obj));
48       my_obj->ctx = jlog_new(path);
49       my_obj->path = strdup(path);
50       if(items > 2) {
51         options = SvIV(ST(2));
52         if(items > 3) {
53           size = SvIV(ST(3));
54         }
55       }
56
57       if(!my_obj->ctx) {
58         FREE_JLOG_OBJ(my_obj);
59         croak("jlog_new(%s) failed", path);
60       }
61       if(options & O_CREAT) {
62         if(size) {
63           jlog_ctx_alter_journal_size(my_obj->ctx, size);
64         }
65         if(jlog_ctx_init(my_obj->ctx) != 0) {
66           if(jlog_ctx_err(my_obj->ctx) == JLOG_ERR_CREATE_EXISTS) {
67             if(options & O_EXCL) {
68               FREE_JLOG_OBJ(my_obj);
69               croak("file already exists: %s", path);
70             }
71           } else {
72             int err = jlog_ctx_err(my_obj->ctx);
73             const char *err_string = jlog_ctx_err_string(my_obj->ctx);
74             FREE_JLOG_OBJ(my_obj);
75             croak("error initializing jlog: %d %s", err, err_string);
76           }
77         }
78         jlog_ctx_close(my_obj->ctx);
79         my_obj->ctx = jlog_new(path);
80         if(!my_obj->ctx) {
81           FREE_JLOG_OBJ(my_obj);
82           croak("jlog_new(%s) failed after successful init", path);
83         }
84       }
85       RETVAL = newSV(0);
86       sv_setref_pv(RETVAL, classname, (void *)my_obj);
87     }
88   OUTPUT:
89     RETVAL
90
91 SV *JLOG_JLOG_BEGIN()
92   CODE:
93     {
94       RETVAL = newSViv(JLOG_BEGIN);
95     }
96   OUTPUT:
97     RETVAL
98    
99 SV *JLOG_JLOG_END()
100   CODE:
101     {
102       RETVAL = newSViv(JLOG_END);
103     }
104   OUTPUT:
105     RETVAL
106
107
108 SV *JLOG_add_subscriber(my_obj, subscriber, ...)
109   JLog my_obj;
110   char *subscriber;
111   CODE:
112     {
113       int whence = JLOG_BEGIN;
114       if(items > 2) {
115         whence = SvIV(ST(2));
116       }
117       if(!my_obj || !my_obj->ctx ||
118          jlog_ctx_add_subscriber(my_obj->ctx, subscriber, whence) != 0)
119       {
120         RETVAL = &PL_sv_no;
121       } else {
122         RETVAL = &PL_sv_yes;
123       }
124     }
125   OUTPUT:
126     RETVAL
127
128 SV *JLOG_remove_subscriber(my_obj, subscriber)
129   JLog my_obj;
130   char *subscriber;
131   CODE:
132     {
133       if(!my_obj || !my_obj->ctx ||
134          jlog_ctx_remove_subscriber(my_obj->ctx, subscriber) != 0)
135       {
136         RETVAL = &PL_sv_no;
137       } else {
138         RETVAL = &PL_sv_yes;
139       }
140     }
141   OUTPUT:
142     RETVAL
143
144 void JLOG_list_subscribers(my_obj)
145   JLog my_obj;
146   PPCODE:
147     {
148       char **list;
149       int i;
150       if(!my_obj || !my_obj->ctx) {
151         croak("invalid jlog context");
152       }
153       jlog_ctx_list_subscribers(my_obj->ctx, &list);
154       for(i=0; list[i]; i++) {
155         XPUSHs(sv_2mortal(newSVpv(list[i], 0)));
156       }
157       jlog_ctx_list_subscribers_dispose(my_obj->ctx, list);
158     }
159
160 SV *JLOG_alter_journal_size(my_obj, size)
161   JLog my_obj;
162   size_t size;
163   CODE:
164     {
165       if(!my_obj || !my_obj->ctx) {
166         croak("invalid jlog context");
167       }
168       /* calling jlog_ctx_alter_journal_size here will never have any
169        * effect, it's either too late or too early. Make this return
170        * failure and deprecate it */
171       RETVAL = &PL_sv_no;
172     }
173   OUTPUT:
174     RETVAL
175
176 SV *JLOG_raw_size(my_obj)
177   JLog my_obj;
178   CODE:
179     {
180       size_t size;
181       if(!my_obj || !my_obj->ctx) {
182         croak("invalid jlog context");
183       }
184       size = jlog_raw_size(my_obj->ctx);
185       RETVAL = newSViv(size);
186     }
187   OUTPUT:
188     RETVAL
189  
190 void JLOG_close(my_obj)
191   JLog my_obj;
192   CODE:
193     {
194       if(!my_obj || !my_obj->ctx) { return; }
195       jlog_ctx_close(my_obj->ctx);
196       my_obj->ctx = NULL;
197     }
198
199 void JLOG_DESTROY(my_obj)
200   JLog my_obj;
201   CODE:
202     {
203       if(!my_obj) return;
204       FREE_JLOG_OBJ(my_obj);
205     }
206
207
208 MODULE = JLog PACKAGE = JLog::Writer PREFIX=JLOG_W_
209
210
211 SV *JLOG_W_open(my_obj)
212   JLog_Writer my_obj;
213   CODE:
214     {
215       if(!my_obj || !my_obj->ctx) {
216         croak("invalid jlog context");
217       }
218       if(jlog_ctx_open_writer(my_obj->ctx) != 0) {
219         SYS_CROAK("jlog_ctx_open_writer failed");
220       } else {
221         RETVAL = newSVsv(ST(0));
222       }
223     }
224   OUTPUT:
225     RETVAL
226
227 SV *JLOG_W_write(my_obj, buffer_sv, ...)
228   JLog_Writer my_obj;
229   SV *buffer_sv;
230   CODE:
231     {
232       char *buffer;
233       int ts = 0;
234       jlog_message m;
235       struct timeval t;
236       STRLEN buffer_len;
237
238       if(!my_obj || !my_obj->ctx) {
239         croak("invalid jlog context");
240       }
241       if(items > 2) {
242         ts = (time_t) SvIV(ST(2));
243       }
244
245       buffer = SvPVx(buffer_sv, buffer_len);
246       m.mess = buffer;
247       m.mess_len = buffer_len;
248       t.tv_sec = ts;
249       t.tv_usec = 0;
250
251       if(jlog_ctx_write_message(my_obj->ctx, &m, ts?&t:NULL) < 0) {
252         RETVAL = &PL_sv_no;
253       } else {
254         RETVAL = &PL_sv_yes;
255       }
256     }
257   OUTPUT:
258     RETVAL
259
260
261 MODULE = JLog PACKAGE = JLog::Reader PREFIX=JLOG_R_
262
263
264 SV *JLOG_R_open(my_obj, subscriber)
265   JLog_Reader my_obj;
266   char *subscriber;
267   CODE:
268     {
269       if(!my_obj || !my_obj->ctx) {
270         croak("invalid jlog context");
271       }
272       if(jlog_ctx_open_reader(my_obj->ctx, subscriber) != 0) {
273         SYS_CROAK("jlog_ctx_open_reader failed");
274       } else {
275         RETVAL = newSVsv(ST(0));
276       }
277     }
278   OUTPUT:
279     RETVAL
280
281 SV * JLOG_R_read(my_obj)
282   JLog_Reader my_obj;
283   CODE:
284     {
285       const jlog_id epoch = { 0, 0 };
286       jlog_id cur;
287       jlog_message message;
288       int cnt;
289       if(!my_obj || !my_obj->ctx) {
290         croak("invalid jlog context");
291       }
292       /* if start is unset, we need to read the interval (again) */
293       if(!memcmp(&my_obj->start, &epoch, sizeof(jlog_id)))
294       {
295         cnt = jlog_ctx_read_interval(my_obj->ctx, &my_obj->start, &my_obj->end);
296         if(cnt == -1) SYS_CROAK("jlog_ctx_read_interval failed");
297         if(cnt == 0) {
298           my_obj->start = epoch;
299           my_obj->end = epoch;
300           RETVAL = &PL_sv_undef;
301           goto end;
302         }
303       }
304       /* if last is unset, start at the beginning */
305       if(!memcmp(&my_obj->last, &epoch, sizeof(jlog_id))) {
306         cur = my_obj->start;
307       } else {
308         /* if we've already read the end, return; otherwise advance */
309         if (!memcmp(&my_obj->last, &my_obj->end, sizeof(jlog_id))) {
310           my_obj->start = epoch;
311           my_obj->end = epoch;
312           RETVAL = &PL_sv_undef;
313           goto end;
314         } else {
315           cur = my_obj->last;
316           JLOG_ID_ADVANCE(&cur);
317         }
318       }
319
320       if(jlog_ctx_read_message(my_obj->ctx, &cur, &message) != 0) {
321         /* read failed; croak, but recover if the read is retried */
322         my_obj->start = epoch;
323         my_obj->last = epoch;
324         my_obj->end = epoch;
325         SYS_CROAK("read failed");
326       }
327       if(my_obj->auto_checkpoint) {
328         if(jlog_ctx_read_checkpoint(my_obj->ctx, &cur) != 0)
329           SYS_CROAK("checkpoint failed");
330         /* we have to re-read the interval after a checkpoint */
331         my_obj->last = epoch;
332         my_obj->start = epoch;
333         my_obj->end = epoch;
334       } else {
335         /* update last */
336         my_obj->last = cur;
337         /* if we've reaached the end, clear interval so we'll re-read it */
338         if(!memcmp(&my_obj->last, &my_obj->end, sizeof(jlog_id))) {
339           my_obj->start = epoch;
340           my_obj->end = epoch;
341         }
342       }
343       RETVAL = newSVpv(message.mess, message.mess_len);
344 end:
345       ;
346     }
347   OUTPUT:
348     RETVAL
349
350 SV *JLOG_R_checkpoint(my_obj)
351   JLog_Reader my_obj;
352   CODE:
353     {
354       jlog_id epoch = { 0, 0 };
355       if(!my_obj || !my_obj->ctx) {
356         croak("invalid jlog context");
357       }
358       if(memcmp(&my_obj->last, &epoch, sizeof(jlog_id)))
359       {
360         jlog_ctx_read_checkpoint(my_obj->ctx, &my_obj->last);
361         /* we have to re-read the interval after a checkpoint */
362         my_obj->last = epoch;
363         my_obj->start = epoch;
364         my_obj->end = epoch;
365       }
366       RETVAL = newSVsv(ST(0));
367     }
368   OUTPUT:
369     RETVAL
370
371 SV *JLOG_R_auto_checkpoint(my_obj, ...)
372   JLog_Reader my_obj;
373   CODE:
374     {
375       if(!my_obj || !my_obj->ctx) {
376         croak("invalid jlog context");
377       }
378       if(items > 1) {
379         int ac = SvIV(ST(1));
380         my_obj->auto_checkpoint = ac;
381       }
382       RETVAL = newSViv(my_obj->auto_checkpoint);
383     }
384   OUTPUT:
385     RETVAL
Note: See TracBrowser for help on using the browser.