root/perl/JLog.xs

Revision 2adf675734a53374fc3d5457d0a1378bc94f9448, 10.2 kB (checked in by George Schlossnagle <george@omniti.com>, 5 years ago)

It looks like there's a potential race condition where the metastore can be updated by a writer before
it's underlying storage is created. I don't know why we never ran into this before, but it seems we
should have. We can work around this in the perl api though

  • Property mode set to 100644
Line 
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include "ppport.h"
6 #include "jlog.h"
7
8 typedef struct {
9   jlog_ctx *ctx;
10   char *path;
11   jlog_id start;
12   jlog_id last;
13   jlog_id prev;
14   jlog_id end;
15   int auto_checkpoint;
16   int error;
17 } jlog_obj;
18
19 typedef jlog_obj * JLog;
20 typedef jlog_obj * JLog_Writer;
21 typedef jlog_obj * JLog_Reader;
22
23 #define FREE_JLOG_OBJ(my_obj) do { \
24   if(my_obj->ctx) { \
25     jlog_ctx_close(my_obj->ctx); \
26   } \
27   if(my_obj->path){ \
28     free(my_obj->path); \
29   } \
30   free(my_obj); \
31 } while(0)
32
33 #define SYS_CROAK(message) do { \
34   croak(message "; error: %d (%s) errno: %d (%s)", \
35     jlog_ctx_err(my_obj->ctx), jlog_ctx_err_string(my_obj->ctx), \
36     jlog_ctx_errno(my_obj->ctx), strerror(jlog_ctx_errno(my_obj->ctx))); \
37 } while (0)
38
39 MODULE = JLog           PACKAGE = JLog PREFIX=JLOG_
40
41 SV *JLOG_new(classname, path, ...)
42   char *classname;
43   char *path;
44   CODE:
45     {
46       jlog_obj *my_obj;
47       int options = O_CREAT;
48       size_t size = 0;
49       my_obj = calloc(1, sizeof(*my_obj));
50       my_obj->ctx = jlog_new(path);
51       my_obj->path = strdup(path);
52       if(items > 2) {
53         options = SvIV(ST(2));
54         if(items > 3) {
55           size = SvIV(ST(3));
56         }
57       }
58
59       if(!my_obj->ctx) {
60         FREE_JLOG_OBJ(my_obj);
61         croak("jlog_new(%s) failed", path);
62       }
63       if(options & O_CREAT) {
64         if(size) {
65           jlog_ctx_alter_journal_size(my_obj->ctx, size);
66         }
67         if(jlog_ctx_init(my_obj->ctx) != 0) {
68           if(jlog_ctx_err(my_obj->ctx) == JLOG_ERR_CREATE_EXISTS) {
69             if(options & O_EXCL) {
70               FREE_JLOG_OBJ(my_obj);
71               croak("file already exists: %s", path);
72             }
73           } else {
74             int err = jlog_ctx_err(my_obj->ctx);
75             const char *err_string = jlog_ctx_err_string(my_obj->ctx);
76             FREE_JLOG_OBJ(my_obj);
77             croak("error initializing jlog: %d %s", err, err_string);
78           }
79         }
80         jlog_ctx_close(my_obj->ctx);
81         my_obj->ctx = jlog_new(path);
82         if(!my_obj->ctx) {
83           FREE_JLOG_OBJ(my_obj);
84           croak("jlog_new(%s) failed after successful init", path);
85         }
86       }
87       RETVAL = newSV(0);
88       sv_setref_pv(RETVAL, classname, (void *)my_obj);
89     }
90   OUTPUT:
91     RETVAL
92
93 SV *JLOG_JLOG_BEGIN()
94   CODE:
95     {
96       RETVAL = newSViv(JLOG_BEGIN);
97     }
98   OUTPUT:
99     RETVAL
100    
101 SV *JLOG_JLOG_END()
102   CODE:
103     {
104       RETVAL = newSViv(JLOG_END);
105     }
106   OUTPUT:
107     RETVAL
108
109
110 SV *JLOG_add_subscriber(my_obj, subscriber, ...)
111   JLog my_obj;
112   char *subscriber;
113   CODE:
114     {
115       int whence = JLOG_BEGIN;
116       if(items > 2) {
117         whence = SvIV(ST(2));
118       }
119       if(!my_obj || !my_obj->ctx ||
120          jlog_ctx_add_subscriber(my_obj->ctx, subscriber, whence) != 0)
121       {
122         RETVAL = &PL_sv_no;
123       } else {
124         RETVAL = &PL_sv_yes;
125       }
126     }
127   OUTPUT:
128     RETVAL
129
130 SV *JLOG_remove_subscriber(my_obj, subscriber)
131   JLog my_obj;
132   char *subscriber;
133   CODE:
134     {
135       if(!my_obj || !my_obj->ctx ||
136          jlog_ctx_remove_subscriber(my_obj->ctx, subscriber) != 0)
137       {
138         RETVAL = &PL_sv_no;
139       } else {
140         RETVAL = &PL_sv_yes;
141       }
142     }
143   OUTPUT:
144     RETVAL
145
146 void JLOG_list_subscribers(my_obj)
147   JLog my_obj;
148   PPCODE:
149     {
150       char **list;
151       int i;
152       if(!my_obj || !my_obj->ctx) {
153         croak("invalid jlog context");
154       }
155       jlog_ctx_list_subscribers(my_obj->ctx, &list);
156       for(i=0; list[i]; i++) {
157         XPUSHs(sv_2mortal(newSVpv(list[i], 0)));
158       }
159       jlog_ctx_list_subscribers_dispose(my_obj->ctx, list);
160     }
161
162 SV *JLOG_alter_journal_size(my_obj, size)
163   JLog my_obj;
164   size_t size;
165   CODE:
166     {
167       if(!my_obj || !my_obj->ctx) {
168         croak("invalid jlog context");
169       }
170       /* calling jlog_ctx_alter_journal_size here will never have any
171        * effect, it's either too late or too early. Make this return
172        * failure and deprecate it */
173       RETVAL = &PL_sv_no;
174     }
175   OUTPUT:
176     RETVAL
177
178 SV *JLOG_raw_size(my_obj)
179   JLog my_obj;
180   CODE:
181     {
182       size_t size;
183       if(!my_obj || !my_obj->ctx) {
184         croak("invalid jlog context");
185       }
186       size = jlog_raw_size(my_obj->ctx);
187       RETVAL = newSViv(size);
188     }
189   OUTPUT:
190     RETVAL
191  
192 void JLOG_close(my_obj)
193   JLog my_obj;
194   CODE:
195     {
196       if(!my_obj || !my_obj->ctx) { return; }
197       jlog_ctx_close(my_obj->ctx);
198       my_obj->ctx = NULL;
199     }
200
201 SV* JLOG_inspect(my_obj)
202   JLog my_obj;
203   CODE:
204     {
205       HV *rh;
206       char start[20], last[20], prev[20], end[20];
207       rh = (HV *)sv_2mortal((SV *)newHV());
208       jlog_snprint_logid(start, sizeof(start), &my_obj->start);
209       hv_store(rh, "start", sizeof("start") - 1, newSVpv(start, 0), 0);
210
211       jlog_snprint_logid(last, sizeof(last), &my_obj->last);
212       hv_store(rh, "last", sizeof("last") - 1, newSVpv(last, 0), 0);
213
214       jlog_snprint_logid(prev, sizeof(prev), &my_obj->prev);
215       hv_store(rh, "prev", sizeof("prev") - 1, newSVpv(prev, 0), 0);
216
217       jlog_snprint_logid(end, sizeof(end), &my_obj->end);
218       hv_store(rh, "end", sizeof("end") - 1, newSVpv(end, 0), 0);
219
220       hv_store(rh, "path", sizeof("path") - 1, newSVpv(my_obj->path, 0), 0);
221       RETVAL = newRV((SV *)rh);
222     }
223   OUTPUT:
224     RETVAL
225
226 void JLOG_DESTROY(my_obj)
227   JLog my_obj;
228   CODE:
229     {
230       if(!my_obj) return;
231       FREE_JLOG_OBJ(my_obj);
232     }
233
234
235 MODULE = JLog PACKAGE = JLog::Writer PREFIX=JLOG_W_
236
237
238 SV *JLOG_W_open(my_obj)
239   JLog_Writer my_obj;
240   CODE:
241     {
242       if(!my_obj || !my_obj->ctx) {
243         croak("invalid jlog context");
244       }
245       if(jlog_ctx_open_writer(my_obj->ctx) != 0) {
246         SYS_CROAK("jlog_ctx_open_writer failed");
247       } else {
248         RETVAL = newSVsv(ST(0));
249       }
250     }
251   OUTPUT:
252     RETVAL
253
254 SV *JLOG_W_write(my_obj, buffer_sv, ...)
255   JLog_Writer my_obj;
256   SV *buffer_sv;
257   CODE:
258     {
259       char *buffer;
260       int ts = 0;
261       jlog_message m;
262       struct timeval t;
263       STRLEN buffer_len;
264
265       if(!my_obj || !my_obj->ctx) {
266         croak("invalid jlog context");
267       }
268       if(items > 2) {
269         ts = (time_t) SvIV(ST(2));
270       }
271
272       buffer = SvPVx(buffer_sv, buffer_len);
273       m.mess = buffer;
274       m.mess_len = buffer_len;
275       t.tv_sec = ts;
276       t.tv_usec = 0;
277
278       if(jlog_ctx_write_message(my_obj->ctx, &m, ts?&t:NULL) < 0) {
279         RETVAL = &PL_sv_no;
280       } else {
281         RETVAL = &PL_sv_yes;
282       }
283     }
284   OUTPUT:
285     RETVAL
286
287
288 MODULE = JLog PACKAGE = JLog::Reader PREFIX=JLOG_R_
289
290
291 SV *JLOG_R_open(my_obj, subscriber)
292   JLog_Reader my_obj;
293   char *subscriber;
294   CODE:
295     {
296       if(!my_obj || !my_obj->ctx) {
297         croak("invalid jlog context");
298       }
299       if(jlog_ctx_open_reader(my_obj->ctx, subscriber) != 0) {
300         SYS_CROAK("jlog_ctx_open_reader failed");
301       } else {
302         RETVAL = newSVsv(ST(0));
303       }
304     }
305   OUTPUT:
306     RETVAL
307
308 SV * JLOG_R_read(my_obj)
309   JLog_Reader my_obj;
310   CODE:
311     {
312       const jlog_id epoch = { 0, 0 };
313       jlog_id cur;
314       jlog_message message;
315       int cnt;
316       if(!my_obj || !my_obj->ctx) {
317         croak("invalid jlog context");
318       }
319       /* if start is unset, we need to read the interval (again) */
320       if(my_obj->error || !memcmp(&my_obj->start, &epoch, sizeof(jlog_id)))
321       {
322         my_obj->error = 0;
323         cnt = jlog_ctx_read_interval(my_obj->ctx, &my_obj->start, &my_obj->end);
324         if(cnt == 0 || (cnt == -1 && jlog_ctx_err(my_obj->ctx) == JLOG_ERR_FILE_OPEN)) {
325           my_obj->start = epoch;
326           my_obj->end = epoch;
327           RETVAL = &PL_sv_undef;
328           goto end;
329         }
330         else if(cnt == -1) SYS_CROAK("jlog_ctx_read_interval failed");
331       }
332       /* if last is unset, start at the beginning */
333       if(!memcmp(&my_obj->last, &epoch, sizeof(jlog_id))) {
334         cur = my_obj->start;
335       } else {
336         /* if we've already read the end, return; otherwise advance */
337         cur = my_obj->last;
338         if(!memcmp(&my_obj->prev, &my_obj->end, sizeof(jlog_id))) {
339           my_obj->start = epoch;
340           my_obj->end = epoch;
341           RETVAL = &PL_sv_undef;
342           goto end;
343         }
344         jlog_ctx_advance_id(my_obj->ctx, &my_obj->last, &cur, &my_obj->end);
345         if(!memcmp(&my_obj->last, &cur, sizeof(jlog_id))) {
346           my_obj->start = epoch;
347           my_obj->end = epoch;
348           RETVAL = &PL_sv_undef;
349           goto end;
350         }
351       }
352       if(jlog_ctx_read_message(my_obj->ctx, &cur, &message) != 0) {
353         if(jlog_ctx_err(my_obj->ctx) == JLOG_ERR_FILE_OPEN) {
354           my_obj->start = epoch;
355           my_obj->end = epoch;
356           RETVAL = &PL_sv_undef;
357           goto end;
358         }
359         /* read failed; croak, but recover if the read is retried */
360         my_obj->error = 1;
361         SYS_CROAK("read failed");
362       }
363       if(my_obj->auto_checkpoint) {
364         if(jlog_ctx_read_checkpoint(my_obj->ctx, &cur) != 0)
365           SYS_CROAK("checkpoint failed");
366         /* we have to re-read the interval after a checkpoint */
367         my_obj->last = epoch;
368         my_obj->prev = epoch;
369         my_obj->start = epoch;
370         my_obj->end = epoch;
371       } else {
372         /* update last */
373         my_obj->prev = my_obj->last;
374         my_obj->last = cur;
375         /* if we've reaached the end, clear interval so we'll re-read it */
376       }
377       RETVAL = newSVpv(message.mess, message.mess_len);
378 end:
379       ;
380     }
381   OUTPUT:
382     RETVAL
383
384 SV *JLOG_R_rewind(my_obj)
385   JLog_Reader my_obj;
386   CODE:
387     {
388       if(!my_obj || !my_obj->ctx) {
389         croak("invalid jlog context");
390       }
391       my_obj->last = my_obj->prev;
392       RETVAL = newSVsv(ST(0));
393     }
394   OUTPUT:
395     RETVAL
396
397 SV *JLOG_R_checkpoint(my_obj)
398   JLog_Reader my_obj;
399   CODE:
400     {
401       jlog_id epoch = { 0, 0 };
402       if(!my_obj || !my_obj->ctx) {
403         croak("invalid jlog context");
404       }
405       if(memcmp(&my_obj->last, &epoch, sizeof(jlog_id)))
406       {
407         jlog_ctx_read_checkpoint(my_obj->ctx, &my_obj->last);
408         /* we have to re-read the interval after a checkpoint */
409         my_obj->last = epoch;
410         my_obj->start = epoch;
411         my_obj->end = epoch;
412       }
413       RETVAL = newSVsv(ST(0));
414     }
415   OUTPUT:
416     RETVAL
417
418 SV *JLOG_R_auto_checkpoint(my_obj, ...)
419   JLog_Reader my_obj;
420   CODE:
421     {
422       if(!my_obj || !my_obj->ctx) {
423         croak("invalid jlog context");
424       }
425       if(items > 1) {
426         int ac = SvIV(ST(1));
427         my_obj->auto_checkpoint = ac;
428       }
429       RETVAL = newSViv(my_obj->auto_checkpoint);
430     }
431   OUTPUT:
432     RETVAL
Note: See TracBrowser for help on using the browser.