root/jlog.c

Revision 81ac86a15e3940ff0b787698853fb6a6157f800b, 37.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 7 years ago)

initial import with Ecelerity bits removed and some autoconf glue added in. Could certainly use some work on the build/install. Needs shared lib support for multiple platforms

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2001-2006 OmniTI, Inc. All rights reserved
3  *
4  * THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF OMNITI
5  * The copyright notice above does not evidence any
6  * actual or intended publication of such source code.
7  *
8  * Redistribution of this material is strictly prohibited.
9  *
10  */
11
12 /*****************************************************************
13
14   Journaled logging... append only.
15
16       (1) find current file, or allocate a file, extendible and mark
17           it current.
18  
19       (2) Write records to it, records include their size, so
20           a simple inspection can detect and incomplete trailing
21           record.
22    
23       (3) Write append until the file reaches a certain size.
24
25       (4) Allocate a file, extensible.
26
27       (5) RESYNC INDEX on 'finished' file (see reading:3) and postpend
28           an offset '0' to the index.
29    
30       (2) goto (1)
31    
32   Reading journals...
33
34       (1) find oldest checkpoint of all subscribers, remove all older files.
35
36       (2) (file, last_read) = find_checkpoint for this subscriber
37
38       (3) RESYNC INDEX:
39           open record index for file, seek to the end -  off_t.
40           this is the offset of the last noticed record in this file.
41           open file, seek to this point, roll forward writing the index file
42           _do not_ write an offset for the last record unless it is found
43           complete.
44
45       (4) read entries from last_read+1 -> index of record index
46
47 */
48 #include <stdio.h>
49
50 #include "jlog_config.h"
51 #include "jlog_private.h"
52 #if HAVE_DIRENT_H
53 #include <dirent.h>
54 #endif
55 #if HAVE_FCNTL_H
56 #include <fcntl.h>
57 #endif
58 #if HAVE_ERRNO_H
59 #include <errno.h>
60 #endif
61 #if HAVE_TIME_H
62 #include <time.h>
63 #endif
64
65 #define BUFFERED_INDICES 1024
66
67 static jlog_file *__jlog_open_writer(jlog_ctx *ctx);
68 static int __jlog_close_writer(jlog_ctx *ctx);
69 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log);
70 static int __jlog_close_reader(jlog_ctx *ctx);
71 static int __jlog_close_checkpoint(jlog_ctx *ctx);
72 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log);
73 static int __jlog_close_indexer(jlog_ctx *ctx);
74 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *c);
75 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags);
76 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log);
77 static int __jlog_munmap_reader(jlog_ctx *ctx);
78
79 int jlog_snprint_logid(char *b, int n, const jlog_id *id) {
80   return snprintf(b, n, "%08x:%08x", id->log, id->marker);
81 }
82
83 int jlog_repair_datafile(jlog_ctx *ctx, u_int32_t log)
84 {
85   jlog_message_header hdr;
86   char *this, *next, *afternext, *mmap_end;
87   int i, invalid_count = 0;
88   struct {
89     off_t start, end;
90   } *invalid = NULL;
91   off_t orig_len, src, dst, len;
92
93 #define TAG_INVALID(s, e) do { \
94   if (invalid_count) \
95     invalid = realloc(invalid, (invalid_count + 1) * sizeof(*invalid)); \
96   else \
97     invalid = malloc(sizeof(*invalid)); \
98   invalid[invalid_count].start = s - (char *)ctx->mmap_base; \
99   invalid[invalid_count].end = e - (char *)ctx->mmap_base; \
100   invalid_count++; \
101 } while (0)
102
103   ctx->last_error = JLOG_ERR_SUCCESS;
104
105   /* we want the reader's open logic because this runs in the read path
106    * the underlying fds are always RDWR anyway */
107   __jlog_open_reader(ctx, log);
108   if (!ctx->data) {
109     ctx->last_error = JLOG_ERR_FILE_OPEN;
110     ctx->last_errno = errno;
111     return -1;
112   }
113   if (!jlog_file_lock(ctx->data)) {
114     ctx->last_error = JLOG_ERR_LOCK;
115     ctx->last_errno = errno;
116     return -1;
117   }
118   if (__jlog_mmap_reader(ctx, log) != 0)
119     SYS_FAIL(JLOG_ERR_FILE_READ);
120
121   orig_len = ctx->mmap_len;
122   mmap_end = ctx->mmap_base + ctx->mmap_len;
123   /* these values will cause us to fall right into the error clause and
124    * start searching for a valid header from offset 0 */
125   this = ctx->mmap_base - sizeof(hdr);
126   hdr.reserved = 0;
127   hdr.mlen = 0;
128
129   while (this + sizeof(hdr) <= mmap_end) {
130     next = this + sizeof(hdr) + hdr.mlen;
131     if (next <= (char *)ctx->mmap_base) goto error;
132     if (next == mmap_end) {
133       this = next;
134       break;
135     }
136     if (next + sizeof(hdr) > mmap_end) goto error;
137     memcpy(&hdr, next, sizeof(hdr));
138     if (hdr.reserved != 0) goto error;
139     this = next;
140     continue;
141   error:
142     for (next = this + sizeof(hdr); next + sizeof(hdr) <= mmap_end; next++) {
143       if (!next[0] && !next[1] && !next[2] && !next[3]) {
144         memcpy(&hdr, next, sizeof(hdr));
145         afternext = next + sizeof(hdr) + hdr.mlen;
146         if (afternext <= (char *)ctx->mmap_base) continue;
147         if (afternext == mmap_end) break;
148         if (afternext + sizeof(hdr) > mmap_end) continue;
149         memcpy(&hdr, afternext, sizeof(hdr));
150         if (hdr.reserved == 0) break;
151       }
152     }
153     /* correct for while loop entry condition */
154     if (this < (char *)ctx->mmap_base) this = ctx->mmap_base;
155     if (next + sizeof(hdr) > mmap_end) break;
156     if (next > this) TAG_INVALID(this, next);
157     this = afternext;
158   }
159   if (this != mmap_end) TAG_INVALID(this, mmap_end);
160
161 #undef TAG_INVALID
162
163 #define MOVE_SEGMENT do { \
164   char cpbuff[4096]; \
165   off_t chunk; \
166   while(len > 0) { \
167     chunk = len; \
168     if (chunk > sizeof(cpbuff)) chunk = sizeof(cpbuff); \
169     if (!jlog_file_pread(ctx->data, &cpbuff, chunk, src)) \
170       SYS_FAIL(JLOG_ERR_FILE_READ); \
171     if (!jlog_file_pwrite(ctx->data, &cpbuff, chunk, dst)) \
172       SYS_FAIL(JLOG_ERR_FILE_WRITE); \
173     src += chunk; \
174     dst += chunk; \
175     len -= chunk; \
176   } \
177 } while (0)
178
179   if (invalid_count > 0) {
180     __jlog_munmap_reader(ctx);
181     src = 0;
182     dst = invalid[0].start;
183     for (i = 0; i < invalid_count - 1; ) {
184       src = invalid[i].end;
185       len = invalid[++i].start - src;
186       MOVE_SEGMENT;
187     }
188     src = invalid[invalid_count - 1].end;
189     len = orig_len - src;
190     if (len > 0) MOVE_SEGMENT;
191     if (!jlog_file_truncate(ctx->data, dst))
192       SYS_FAIL(JLOG_ERR_FILE_WRITE);
193   }
194
195 #undef MOVE_SEGMENT
196
197 finish:
198   jlog_file_unlock(ctx->data);
199   if (invalid) free(invalid);
200   if (ctx->last_error != JLOG_ERR_SUCCESS) return -1;
201   return invalid_count;
202 }
203
204 int jlog_inspect_datafile(jlog_ctx *ctx, u_int32_t log)
205 {
206   jlog_message_header hdr;
207   char *this, *next, *mmap_end;
208   int i;
209   time_t timet;
210   struct tm tm;
211   char tbuff[128];
212
213   ctx->last_error = JLOG_ERR_SUCCESS;
214
215   __jlog_open_reader(ctx, log);
216   if (!ctx->data)
217     SYS_FAIL(JLOG_ERR_FILE_OPEN);
218   if (__jlog_mmap_reader(ctx, log) != 0)
219     SYS_FAIL(JLOG_ERR_FILE_READ);
220
221   mmap_end = ctx->mmap_base + ctx->mmap_len;
222   this = ctx->mmap_base;
223   i = 0;
224   while (this + sizeof(hdr) <= mmap_end) {
225     memcpy(&hdr, this, sizeof(hdr));
226     i++;
227     if (hdr.reserved != 0) {
228       fprintf(stderr, "Message %d at [%ld] has invalid reserved value %u\n",
229               i, this - (char *)ctx->mmap_base, hdr.reserved);
230       return 1;
231     }
232
233     fprintf(stderr, "Message %d at [%ld] of (%lu+%u)", i,
234             this - (char *)ctx->mmap_base, sizeof(hdr), hdr.mlen);
235
236     next = this + sizeof(hdr) + hdr.mlen;
237     if (next <= (char *)ctx->mmap_base) {
238       fprintf(stderr, " WRAPPED TO NEGATIVE OFFSET!\n");
239       return 1;
240     }
241     if (next > mmap_end) {
242       fprintf(stderr, " OFF THE END!\n");
243       return 1;
244     }
245
246     timet = hdr.tv_sec;
247     localtime_r(&timet, &tm);
248     strftime(tbuff, sizeof(tbuff), "%c", &tm);
249     fprintf(stderr, "\n\ttime: %s\n\tmlen: %u\n", tbuff, hdr.mlen);
250     this = next;
251   }
252   if (this < mmap_end) {
253     fprintf(stderr, "%ld bytes of junk at the end\n", mmap_end - this);
254     return 1;
255   }
256
257   return 0;
258 finish:
259   return -1;
260 }
261
262 int jlog_idx_details(jlog_ctx *ctx, u_int32_t log,
263                      u_int32_t *marker, int *closed)
264 {
265   off_t index_len;
266   u_int64_t index;
267
268   __jlog_open_indexer(ctx, log);
269   if (!ctx->index)
270     SYS_FAIL(JLOG_ERR_IDX_OPEN);
271   if ((index_len = jlog_file_size(ctx->index)) == -1)
272     SYS_FAIL(JLOG_ERR_IDX_SEEK);
273   if (index_len % sizeof(u_int64_t))
274     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
275   if (index_len > sizeof(u_int64_t)) {
276     if (!jlog_file_pread(ctx->index, &index, sizeof(u_int64_t),
277                          index_len - sizeof(u_int64_t)))
278     {
279       SYS_FAIL(JLOG_ERR_IDX_READ);
280     }
281     if (index) {
282       *marker = index_len / sizeof(u_int64_t);
283       *closed = 0;
284     } else {
285       *marker = (index_len / sizeof(u_int64_t)) - 1;
286       *closed = 1;
287     }
288   } else {
289     *marker = index_len / sizeof(u_int64_t);
290     *closed = 0;
291   }
292
293   return 0;
294 finish:
295   return -1;
296 }
297
298 static int __jlog_unlink_datafile(jlog_ctx *ctx, u_int32_t log) {
299   char file[MAXPATHLEN];
300
301   if(ctx->current_log == log) {
302     __jlog_close_reader(ctx);
303     __jlog_close_indexer(ctx);
304   }
305
306   STRSETDATAFILE(ctx, file, log);
307 #ifdef DEBUG
308   fprintf(stderr, "unlinking %s\n", file);
309 #endif
310   unlink(file);
311
312   strcat(file, INDEX_EXT);
313 #ifdef DEBUG
314   fprintf(stderr, "unlinking %s\n", file);
315 #endif
316   unlink(file);
317   return 0;
318 }
319
320 static int __jlog_open_metastore(jlog_ctx *ctx)
321 {
322   char file[MAXPATHLEN];
323   int len;
324
325 #ifdef DEBUG
326   fprintf(stderr, "__jlog_open_metastore\n");
327 #endif
328   strcpy(file, ctx->path);
329   len = strlen(ctx->path);
330   file[len++] = IFS_CH;
331   strcpy(&file[len], "metastore");
332
333   ctx->metastore = jlog_file_open(file, O_CREAT, ctx->file_mode);
334
335   if (!ctx->metastore) {
336     ctx->last_errno = errno;
337     ctx->last_error = JLOG_ERR_CREATE_META;
338     return -1;
339   }
340
341   return 0;
342 }
343
344 /* exported */
345 int __jlog_pending_readers(jlog_ctx *ctx, u_int32_t log) {
346   int readers;
347   DIR *dir;
348   struct dirent *ent;
349   char file[MAXPATHLEN];
350   int len;
351   jlog_id id;
352
353   readers = 0;
354
355   dir = opendir(ctx->path);
356   if (!dir) return -1;
357  
358   strcpy(file, ctx->path);
359   len = strlen(ctx->path);
360   file[len++] = IFS_CH;
361   file[len] = '\0';
362
363   while ((ent = readdir(dir))) {
364     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
365       jlog_file *cp;
366
367       strcpy(file + len, ent->d_name);
368 #ifdef DEBUG
369       fprintf(stderr, "Checking if %s needs %s...\n", ent->d_name, ctx->path);
370 #endif
371       if ((cp = jlog_file_open(file, 0, ctx->file_mode))) {
372         if (jlog_file_lock(cp)) {
373           jlog_file_pread(cp, &id, sizeof(id), 0);
374 #ifdef DEBUG
375           fprintf(stderr, "\t%u <= %u (pending reader)\n", id.log, log);
376 #endif
377           if (id.log <= log) {
378             readers++;
379           }
380           jlog_file_unlock(cp);
381         }
382         jlog_file_close(cp);
383       }
384     }
385   }
386   closedir(dir);
387   return readers;
388 }
389 struct _jlog_subs {
390   char **subs;
391   int used;
392   int allocd;
393 };
394
395 int jlog_ctx_list_subscribers_dispose(jlog_ctx *ctx, char **subs) {
396   char *s;
397   int i = 0;
398   if(subs) {
399     while((s = subs[i++]) != NULL) free(s);
400     free(subs);
401   }
402   return 0;
403 }
404
405 int jlog_ctx_list_subscribers(jlog_ctx *ctx, char ***subs) {
406   struct _jlog_subs js = { NULL, 0, 0 };
407   DIR *dir;
408   struct dirent *ent;
409   unsigned char file[MAXPATHLEN];
410   char *p;
411   int len;
412
413   js.subs = calloc(16, sizeof(char *));
414   js.allocd = 16;
415
416   dir = opendir(ctx->path);
417   if (!dir) return -1;
418   while ((ent = readdir(dir))) {
419     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
420
421       for (len = 0, p = ent->d_name + 3; *p;) {
422         unsigned char c;
423         int i;
424
425         for (c = 0, i = 0; i < 16; i++) {
426           if (__jlog_hexchars[i] == *p) {
427             c = i << 4;
428             break;
429           }
430         }
431         p++;
432         for (i = 0; i < 16; i++) {
433           if (__jlog_hexchars[i] == *p) {
434             c |= i;
435             break;
436           }
437         }
438         p++;
439         file[len++] = c;
440       }
441       file[len] = '\0';
442
443       js.subs[js.used++] = strdup((char *)file);
444       if(js.used == js.allocd) {
445         js.allocd *= 2;
446         js.subs = realloc(js.subs, js.allocd*sizeof(char *));
447       }
448       js.subs[js.used] = NULL;
449     }
450   }
451   closedir(dir);
452   *subs = js.subs;
453   return js.used;
454 }
455
456 static int __jlog_save_metastore(jlog_ctx *ctx, int ilocked)
457 {
458   struct _jlog_meta_info info;
459 #ifdef DEBUG
460   fprintf(stderr, "__jlog_save_metastore\n");
461 #endif
462
463   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
464     return -1;
465   }
466
467   info.storage_log = ctx->storage.log;
468   info.unit_limit = ctx->unit_limit;
469   info.safety = ctx->safety;
470
471   if (!jlog_file_pwrite(ctx->metastore, &info, sizeof(info), 0)) {
472     if (!ilocked) jlog_file_unlock(ctx->metastore);
473     return -1;
474   }
475   if (ctx->safety == JLOG_SAFE) {
476     jlog_file_sync(ctx->metastore);
477   }
478
479   if (!ilocked) jlog_file_unlock(ctx->metastore);
480   return 0;
481 }
482
483 static int __jlog_restore_metastore(jlog_ctx *ctx, int ilocked)
484 {
485   struct _jlog_meta_info info;
486 #ifdef DEBUG
487   fprintf(stderr, "__jlog_restore_metastore\n");
488 #endif
489
490   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
491     return -1;
492   }
493
494   if (!jlog_file_pread(ctx->metastore, &info, sizeof(info), 0)) {
495     if (!ilocked) jlog_file_unlock(ctx->metastore);
496     return -1;
497   }
498
499   if (!ilocked) jlog_file_unlock(ctx->metastore);
500
501   ctx->storage.log = info.storage_log;
502   ctx->unit_limit = info.unit_limit;
503   ctx->safety = info.safety;
504
505   return 0;
506 }
507
508 int jlog_get_checkpoint(jlog_ctx *ctx, const char *s, jlog_id *id) {
509   jlog_file *f;
510   int rv = -1;
511
512   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
513     if(!ctx->checkpoint) {
514       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
515     }
516     f = ctx->checkpoint;
517   } else
518     f = __jlog_open_named_checkpoint(ctx, s, 0);
519
520   if (f) {
521     if (jlog_file_lock(f)) {
522       if (jlog_file_pread(f, id, sizeof(*id), 0)) rv = 0;
523       jlog_file_unlock(f);
524     }
525   }
526   if (f && f != ctx->checkpoint) jlog_file_close(f);
527   return rv;
528 }
529
530 static int __jlog_set_checkpoint(jlog_ctx *ctx, const char *s, const jlog_id *id)
531 {
532   jlog_file *f;
533   int rv = -1;
534   jlog_id old_id;
535   u_int32_t log;
536
537   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
538     if(!ctx->checkpoint) {
539       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
540     }
541     f = ctx->checkpoint;
542   } else
543     f = __jlog_open_named_checkpoint(ctx, s, 0);
544
545   if(!f) return -1;
546   if (!jlog_file_lock(f))
547     goto failset;
548
549   if (jlog_file_size(f) == 0) {
550     /* we're setting it for the first time, no segments were pending on it */
551     old_id.log = id->log;
552   } else {
553     if (!jlog_file_pread(f, &old_id, sizeof(old_id), 0))
554       goto failset;
555   }
556   if (!jlog_file_pwrite(f, id, sizeof(*id), 0))
557     goto failset;
558   if (ctx->safety == JLOG_SAFE) {
559     jlog_file_sync(f);
560   }
561   jlog_file_unlock(f);
562   rv = 0;
563
564   for (log = old_id.log; log < id->log; log++) {
565     if (__jlog_pending_readers(ctx, log) == 0) {
566       __jlog_unlink_datafile(ctx, log);
567     }
568   }
569
570  failset:
571   if (f && f != ctx->checkpoint) jlog_file_close(f);
572   return rv;
573 }
574
575 static int __jlog_close_metastore(jlog_ctx *ctx) {
576   if (ctx->metastore) {
577     jlog_file_close(ctx->metastore);
578     ctx->metastore = NULL;
579   }
580   return 0;
581 }
582
583 /* path is assumed to be MAXPATHLEN */
584 static char *compute_checkpoint_filename(jlog_ctx *ctx, const char *subscriber, char *name)
585 {
586   const char *sub;
587   int len;
588
589   /* build checkpoint filename */
590   len = strlen(ctx->path);
591   strcpy(name, ctx->path);
592   name[len++] = IFS_CH;
593   name[len++] = 'c';
594   name[len++] = 'p';
595   name[len++] = '.';
596   for (sub = subscriber; *sub; ) {
597     name[len++] = __jlog_hexchars[((*sub & 0xf0) >> 4)];
598     name[len++] = __jlog_hexchars[(*sub & 0x0f)];
599     sub++;
600   }
601   name[len++] = '\0';
602
603 #ifdef DEBUG
604   fprintf(stderr, "checkpoint %s filename is %s\n", subscriber, name);
605 #endif
606   return name;
607 }
608
609 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags)
610 {
611   char name[MAXPATHLEN];
612   compute_checkpoint_filename(ctx, cpname, name);
613   return jlog_file_open(name, flags, ctx->file_mode);
614 }
615
616 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log) {
617   char file[MAXPATHLEN];
618
619   if(ctx->current_log != log) {
620     __jlog_close_reader(ctx);
621     __jlog_close_indexer(ctx);
622   }
623   if(ctx->data) {
624     return ctx->data;
625   }
626   STRSETDATAFILE(ctx, file, log);
627 #ifdef DEBUG
628   fprintf(stderr, "opening log file[ro]: '%s'\n", file);
629 #endif
630   ctx->data = jlog_file_open(file, 0, ctx->file_mode);
631   ctx->current_log = log;
632   return ctx->data;
633 }
634
635 static int __jlog_munmap_reader(jlog_ctx *ctx) {
636   if(ctx->mmap_base) {
637     munmap(ctx->mmap_base, ctx->mmap_len);
638     ctx->mmap_base = NULL;
639     ctx->mmap_len = 0;
640   }
641   return 0;
642 }
643
644 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log) {
645   if(ctx->current_log == log && ctx->mmap_base) return 0;
646   __jlog_open_reader(ctx, log);
647   if(!ctx->data)
648     return -1;
649   if (!jlog_file_map_read(ctx->data, &ctx->mmap_base, &ctx->mmap_len)) {
650     ctx->mmap_base = NULL;
651     ctx->last_error = JLOG_ERR_FILE_READ;
652     ctx->last_errno = errno;
653     return -1;
654   }
655   return 0;
656 }
657
658 static jlog_file *__jlog_open_writer(jlog_ctx *ctx) {
659   char file[MAXPATHLEN];
660
661   if(ctx->data) {
662     /* Still open */
663     return ctx->data;
664   }
665
666   if(!jlog_file_lock(ctx->metastore))
667     SYS_FAIL(JLOG_ERR_LOCK);
668   if(__jlog_restore_metastore(ctx, 1))
669     SYS_FAIL(JLOG_ERR_META_OPEN);
670   STRSETDATAFILE(ctx, file, ctx->storage.log);
671 #ifdef DEBUG
672   fprintf(stderr, "opening log file[rw]: '%s'\n", file);
673 #endif
674   ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode);
675  finish:
676   jlog_file_unlock(ctx->metastore);
677   return ctx->data;
678 }
679
680 static int __jlog_close_writer(jlog_ctx *ctx) {
681   if (ctx->data) {
682     jlog_file_close(ctx->data);
683     ctx->data = NULL;
684   }
685   return 0;
686 }
687
688 static int __jlog_close_reader(jlog_ctx *ctx) {
689   __jlog_munmap_reader(ctx);
690   if (ctx->data) {
691     jlog_file_close(ctx->data);
692     ctx->data = NULL;
693   }
694   return 0;
695 }
696
697 static int __jlog_close_checkpoint(jlog_ctx *ctx) {
698   if (ctx->checkpoint) {
699     jlog_file_close(ctx->checkpoint);
700     ctx->checkpoint = NULL;
701   }
702   return 0;
703 }
704
705 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log) {
706   char file[MAXPATHLEN];
707
708   if(ctx->current_log != log) {
709     __jlog_close_reader(ctx);
710     __jlog_close_indexer(ctx);
711   }
712   if(ctx->index) {
713     return ctx->index;
714   }
715   STRSETDATAFILE(ctx, file, log);
716   strcat(file, INDEX_EXT);
717 #ifdef DEBUG
718   fprintf(stderr, "opening index file: '%s'\n", idx);
719 #endif
720   ctx->index = jlog_file_open(file, O_CREAT, ctx->file_mode);
721   ctx->current_log = log;
722   return ctx->index;
723 }
724
725 static int __jlog_close_indexer(jlog_ctx *ctx) {
726   if (ctx->index) {
727     jlog_file_close(ctx->index);
728     ctx->index = NULL;
729   }
730   return 0;
731 }
732
733 static int
734 ___jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last,
735                      int *closed) {
736   jlog_message_header logmhdr;
737   int i, second_try = 0;
738   off_t index_off, data_off, data_len;
739   u_int64_t index;
740   u_int64_t indices[BUFFERED_INDICES];
741
742   ctx->last_error = JLOG_ERR_SUCCESS;
743   if(closed) *closed = 0;
744
745   __jlog_open_reader(ctx, log);
746   if (!ctx->data) {
747     ctx->last_error = JLOG_ERR_FILE_OPEN;
748     ctx->last_errno = errno;
749     return -1;
750   }
751
752 #define RESTART do { \
753   if (second_try == 0) { \
754     jlog_file_truncate(ctx->index, 0); \
755     jlog_file_unlock(ctx->index); \
756     second_try = 1; \
757     ctx->last_error = JLOG_ERR_SUCCESS; \
758     goto restart; \
759   } \
760   SYS_FAIL(JLOG_ERR_IDX_CORRUPT); \
761 } while (0)
762
763 restart:
764   __jlog_open_indexer(ctx, log);
765   if (!ctx->index) {
766     ctx->last_error = JLOG_ERR_IDX_OPEN;
767     ctx->last_errno = errno;
768     return -1;
769   }
770   if (!jlog_file_lock(ctx->index)) {
771     ctx->last_error = JLOG_ERR_LOCK;
772     ctx->last_errno = errno;
773     return -1;
774   }
775
776   data_off = 0;
777   if ((data_len = jlog_file_size(ctx->data)) == -1)
778     SYS_FAIL(JLOG_ERR_FILE_SEEK);
779   if ((index_off = jlog_file_size(ctx->index)) == -1)
780     SYS_FAIL(JLOG_ERR_IDX_SEEK);
781
782   if (index_off % sizeof(u_int64_t)) {
783 #ifdef DEBUG
784     fprintf(stderr, "corrupt index [%llu]\n", index_off);
785 #endif
786     RESTART;
787   }
788
789   if (index_off > sizeof(u_int64_t)) {
790     if (!jlog_file_pread(ctx->index, &index, sizeof(index),
791                          index_off - sizeof(u_int64_t)))
792     {
793       SYS_FAIL(JLOG_ERR_IDX_READ);
794     }
795     if (index == 0) {
796       /* This log file has been "closed" */
797 #ifdef DEBUG
798       fprintf(stderr, "index closed\n");
799 #endif
800       if(last) {
801         last->log = log;
802         last->marker = (index_off / sizeof(u_int64_t)) - 1;
803       }
804       if(closed) *closed = 1;
805       goto finish;
806     } else {
807       if (index > data_len) {
808 #ifdef DEBUG
809         fprintf(stderr, "index told me to seek somehwere I can't\n");
810 #endif
811         RESTART;
812       }
813       data_off = index;
814     }
815   }
816
817   if (index_off > 0) {
818     /* We are adding onto a partial index so we must advance a record */
819     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
820       SYS_FAIL(JLOG_ERR_FILE_READ);
821     if ((data_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
822       RESTART;
823   }
824
825   i = 0;
826   while (data_off + sizeof(logmhdr) <= data_len) {
827     off_t next_off = data_off;
828
829     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
830       SYS_FAIL(JLOG_ERR_FILE_READ);
831     if ((next_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
832       break;
833
834     /* Write our new index offset */
835     indices[i++] = data_off;
836     if(i >= BUFFERED_INDICES) {
837 #ifdef DEBUG
838       fprintf(stderr, "writing %i offsets\n", i);
839 #endif
840       if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
841         RESTART;
842       index_off += i * sizeof(u_int64_t);
843       i = 0;
844     }
845     data_off = next_off;
846   }
847   if(i > 0) {
848 #ifdef DEBUG
849     fprintf(stderr, "writing %i offsets\n", i);
850 #endif
851     if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
852       RESTART;
853     index_off += i * sizeof(u_int64_t);
854   }
855   if(last) {
856     last->log = log;
857     last->marker = index_off / sizeof(u_int64_t);
858   }
859   if(log < ctx->storage.log) {
860     if (data_off != data_len) {
861 #ifdef DEBUG
862       fprintf(stderr, "closing index, but %llu != %llu\n", data_off, data_len);
863 #endif
864       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
865     }
866     /* Special case: if we are closing, we next write a '0'
867      * we can't write the closing marker if the data segment had no records
868      * in it, since it will be confused with an index to offset 0 by the
869      * next reader; this only happens when segments are repaired */
870     if (index_off) {
871       index = 0;
872       if (!jlog_file_pwrite(ctx->index, &index, sizeof(u_int64_t), index_off))
873         RESTART;
874     }
875     if(closed) *closed = 1;
876   }
877 #undef RESTART
878
879 finish:
880   jlog_file_unlock(ctx->index);
881 #ifdef DEBUG
882   fprintf(stderr, "index is %s\n", closed?(*closed?"closed":"open"):"unknown");
883 #endif
884   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
885   return -1;
886 }
887
888 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *closed) {
889   int attempts, rv = -1;
890   for(attempts=0; attempts<4; attempts++) {
891     rv = ___jlog_resync_index(ctx, log, last, closed);
892     if(ctx->last_error == JLOG_ERR_SUCCESS) break;
893     if(ctx->last_error == JLOG_ERR_FILE_OPEN ||
894        ctx->last_error == JLOG_ERR_IDX_OPEN) break;
895
896     /* We can't fix the file if someone may write to it again */
897     if(log >= ctx->storage.log) break;
898
899     jlog_file_lock(ctx->index);
900     /* it doesn't really matter what jlog_repair_datafile returns
901      * we'll keep retrying anyway */
902     jlog_repair_datafile(ctx, log);
903     jlog_file_truncate(ctx->index, 0);
904     jlog_file_unlock(ctx->index);
905   }
906   return rv;
907 }
908
909 jlog_ctx *jlog_new(const char *path) {
910   jlog_ctx *ctx;
911   ctx = calloc(1, sizeof(*ctx));
912   ctx->unit_limit = DEFAULT_UNIT_LIMIT;
913   ctx->file_mode = DEFAULT_FILE_MODE;
914   ctx->safety = DEFAULT_SAFETY;
915   ctx->context_mode = JLOG_NEW;
916   ctx->path = strdup(path);
917   return ctx;
918 }
919
920 void jlog_set_error_func(jlog_ctx *ctx, jlog_error_func Func, void *ptr) {
921   ctx->error_func = Func;
922   ctx->error_ctx = ptr;
923 }
924
925 size_t jlog_raw_size(jlog_ctx *ctx) {
926   DIR *d;
927   struct dirent *de;
928   size_t totalsize = 0;
929   int ferr, len;
930   char filename[MAXPATHLEN];
931
932   d = opendir(ctx->path);
933   if(!d) return 0;
934   len = strlen(ctx->path);
935   memcpy(filename, ctx->path, len);
936   filename[len++] = IFS_CH;
937   while((de = readdir(d)) != NULL) {
938     struct stat sb;
939     strcpy(filename+len, de->d_name);
940     while((ferr = stat(filename, &sb)) == -1 && errno == EINTR);
941     if(ferr == 0 && S_ISREG(sb.st_mode)) totalsize += sb.st_size;
942   }
943   closedir(d);
944   return totalsize;
945 }
946
947 const char *jlog_ctx_err_string(jlog_ctx *ctx) {
948   switch (ctx->last_error) {
949 #define MSG_O_MATIC(x)  case x: return #x;
950     MSG_O_MATIC( JLOG_ERR_SUCCESS);
951     MSG_O_MATIC( JLOG_ERR_ILLEGAL_INIT);
952     MSG_O_MATIC( JLOG_ERR_ILLEGAL_OPEN);
953     MSG_O_MATIC( JLOG_ERR_OPEN);
954     MSG_O_MATIC( JLOG_ERR_NOTDIR);
955     MSG_O_MATIC( JLOG_ERR_CREATE_PATHLEN);
956     MSG_O_MATIC( JLOG_ERR_CREATE_EXISTS);
957     MSG_O_MATIC( JLOG_ERR_CREATE_MKDIR);
958     MSG_O_MATIC( JLOG_ERR_CREATE_META);
959     MSG_O_MATIC( JLOG_ERR_LOCK);
960     MSG_O_MATIC( JLOG_ERR_IDX_OPEN);
961     MSG_O_MATIC( JLOG_ERR_IDX_SEEK);
962     MSG_O_MATIC( JLOG_ERR_IDX_CORRUPT);
963     MSG_O_MATIC( JLOG_ERR_IDX_WRITE);
964     MSG_O_MATIC( JLOG_ERR_IDX_READ);
965     MSG_O_MATIC( JLOG_ERR_FILE_OPEN);
966     MSG_O_MATIC( JLOG_ERR_FILE_SEEK);
967     MSG_O_MATIC( JLOG_ERR_FILE_CORRUPT);
968     MSG_O_MATIC( JLOG_ERR_FILE_READ);
969     MSG_O_MATIC( JLOG_ERR_FILE_WRITE);
970     MSG_O_MATIC( JLOG_ERR_META_OPEN);
971     MSG_O_MATIC( JLOG_ERR_ILLEGAL_WRITE);
972     MSG_O_MATIC( JLOG_ERR_ILLEGAL_CHECKPOINT);
973     MSG_O_MATIC( JLOG_ERR_INVALID_SUBSCRIBER);
974     MSG_O_MATIC( JLOG_ERR_ILLEGAL_LOGID);
975     MSG_O_MATIC( JLOG_ERR_SUBSCRIBER_EXISTS);
976     MSG_O_MATIC( JLOG_ERR_CHECKPOINT);
977     MSG_O_MATIC( JLOG_ERR_NOT_SUPPORTED);
978     default: return "Unknown";
979   }
980 }
981
982 int jlog_ctx_err(jlog_ctx *ctx) {
983   return ctx->last_error;
984 }
985
986 int jlog_ctx_errno(jlog_ctx *ctx) {
987   return ctx->last_errno;
988 }
989
990 int jlog_ctx_alter_safety(jlog_ctx *ctx, jlog_safety safety) {
991   if(ctx->context_mode != JLOG_NEW) return -1;
992   ctx->safety = safety;
993   return 0;
994 }
995 int jlog_ctx_alter_journal_size(jlog_ctx *ctx, size_t size) {
996   if(ctx->context_mode != JLOG_NEW) return -1;
997   ctx->unit_limit = size;
998   return 0;
999 }
1000 int jlog_ctx_alter_mode(jlog_ctx *ctx, int mode) {
1001   ctx->file_mode = mode;
1002   return 0;
1003 }
1004 int jlog_ctx_open_writer(jlog_ctx *ctx) {
1005   int rv;
1006   struct stat sb;
1007
1008   ctx->last_error = JLOG_ERR_SUCCESS;
1009   if(ctx->context_mode != JLOG_NEW) {
1010     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1011     return -1;
1012   }
1013   ctx->context_mode = JLOG_APPEND;
1014   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1015   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1016   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1017   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1018   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1019  finish:
1020   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1021   ctx->context_mode = JLOG_INVALID;
1022   return -1;
1023 }
1024 int jlog_ctx_open_reader(jlog_ctx *ctx, const char *subscriber) {
1025   int rv;
1026   struct stat sb;
1027   jlog_id dummy;
1028
1029   ctx->last_error = JLOG_ERR_SUCCESS;
1030   if(ctx->context_mode != JLOG_NEW) {
1031     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1032     return -1;
1033   }
1034   ctx->context_mode = JLOG_READ;
1035   ctx->subscriber_name = strdup(subscriber);
1036   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1037   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1038   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1039   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1040   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &dummy))
1041     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1042   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1043  finish:
1044   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1045   ctx->context_mode = JLOG_INVALID;
1046   return -1;
1047 }
1048 int jlog_ctx_init(jlog_ctx *ctx) {
1049   int rv;
1050   struct stat sb;
1051   int dirmode;
1052
1053   ctx->last_error = JLOG_ERR_SUCCESS;
1054   if(strlen(ctx->path) > MAXLOGPATHLEN-1) {
1055     ctx->last_error = JLOG_ERR_CREATE_PATHLEN;
1056     return -1;
1057   }
1058   if(ctx->context_mode != JLOG_NEW) {
1059     ctx->last_error = JLOG_ERR_ILLEGAL_INIT;
1060     return -1;
1061   }
1062   ctx->context_mode = JLOG_INIT;
1063   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1064   if(rv == 0 || errno != ENOENT) {
1065     SYS_FAIL_EX(JLOG_ERR_CREATE_EXISTS, 0);
1066   }
1067   dirmode = ctx->file_mode;
1068   if(dirmode & 0400) dirmode |= 0100;
1069   if(dirmode & 040) dirmode |= 010;
1070   if(dirmode & 04) dirmode |= 01;
1071   if(mkdir(ctx->path, dirmode) == -1)
1072     SYS_FAIL(JLOG_ERR_CREATE_MKDIR);
1073   chmod(ctx->path, dirmode);
1074   /* Setup our initial state and store our instance metadata */
1075   if(__jlog_open_metastore(ctx) != 0)
1076     SYS_FAIL(JLOG_ERR_CREATE_META);
1077   if(__jlog_save_metastore(ctx, 0) != 0)
1078     SYS_FAIL(JLOG_ERR_CREATE_META);
1079  finish:
1080   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1081   return -1;
1082 }
1083 int jlog_ctx_close(jlog_ctx *ctx) {
1084   __jlog_close_indexer(ctx);
1085   __jlog_close_reader(ctx);
1086   __jlog_close_metastore(ctx);
1087   __jlog_close_checkpoint(ctx);
1088   if(ctx->subscriber_name) free(ctx->subscriber_name);
1089   if(ctx->path) free(ctx->path);
1090   free(ctx);
1091   return 0;
1092 }
1093
1094 static int __jlog_metastore_atomic_increment(jlog_ctx *ctx) {
1095   u_int32_t saved_storage_log = ctx->storage.log;
1096 #ifdef DEBUG
1097   fprintf(stderr, "atomic increment on %u\n", saved_storage_log);
1098 #endif
1099   if (!jlog_file_lock(ctx->metastore))
1100     SYS_FAIL(JLOG_ERR_LOCK);
1101   if(__jlog_restore_metastore(ctx, 1))
1102     SYS_FAIL(JLOG_ERR_META_OPEN);
1103   if(ctx->storage.log == saved_storage_log) {
1104     /* We're the first ones to it, so we get to increment it */
1105     ctx->storage.log++;
1106     if(__jlog_save_metastore(ctx, 1))
1107       SYS_FAIL(JLOG_ERR_META_OPEN);
1108   }
1109  finish:
1110   jlog_file_unlock(ctx->metastore);
1111   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1112   return -1;
1113 }
1114 int jlog_ctx_write_message(jlog_ctx *ctx, jlog_message *mess, struct timeval *when) {
1115   struct timeval now;
1116   jlog_message_header hdr;
1117   off_t current_offset;
1118
1119   ctx->last_error = JLOG_ERR_SUCCESS;
1120   if(ctx->context_mode != JLOG_APPEND) {
1121     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1122     ctx->last_errno = EPERM;
1123     return -1;
1124   }
1125  begin:
1126   __jlog_open_writer(ctx);
1127   if(!ctx->data) {
1128     ctx->last_error = JLOG_ERR_FILE_OPEN;
1129     ctx->last_errno = errno;
1130     return -1;
1131   }
1132   if (!jlog_file_lock(ctx->data)) {
1133     ctx->last_error = JLOG_ERR_LOCK;
1134     ctx->last_errno = errno;
1135     return -1;
1136   }
1137
1138   if ((current_offset = jlog_file_size(ctx->data)) == -1)
1139     SYS_FAIL(JLOG_ERR_FILE_SEEK);
1140   if(ctx->unit_limit <= current_offset) {
1141     jlog_file_unlock(ctx->data);
1142     __jlog_close_writer(ctx);
1143     __jlog_metastore_atomic_increment(ctx);
1144     goto begin;
1145   }
1146
1147   hdr.reserved = 0;
1148   if (when) {
1149     hdr.tv_sec = when->tv_sec;
1150     hdr.tv_usec = when->tv_usec;
1151   } else {
1152     gettimeofday(&now, NULL);
1153     hdr.tv_sec = now.tv_sec;
1154     hdr.tv_usec = now.tv_usec;
1155   }
1156   hdr.mlen = mess->mess_len;
1157   if (!jlog_file_pwrite(ctx->data, &hdr, sizeof(hdr), current_offset))
1158     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1159   current_offset += sizeof(hdr);
1160   if (!jlog_file_pwrite(ctx->data, mess->mess, mess->mess_len, current_offset))
1161     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1162   current_offset += mess->mess_len;
1163
1164   if(ctx->unit_limit <= current_offset) {
1165     jlog_file_unlock(ctx->data);
1166     __jlog_close_writer(ctx);
1167     __jlog_metastore_atomic_increment(ctx);
1168     return 0;
1169   }
1170  finish:
1171   jlog_file_unlock(ctx->data);
1172   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1173   return -1;
1174 }
1175 int jlog_ctx_read_checkpoint(jlog_ctx *ctx, const jlog_id *chkpt) {
1176   ctx->last_error = JLOG_ERR_SUCCESS;
1177  
1178   if(ctx->context_mode != JLOG_READ) {
1179     ctx->last_error = JLOG_ERR_ILLEGAL_CHECKPOINT;
1180     ctx->last_errno = EPERM;
1181     return -1;
1182   }
1183   if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, chkpt) != 0) {
1184     ctx->last_error = JLOG_ERR_CHECKPOINT;
1185     ctx->last_errno = 0;
1186     return -1;
1187   }
1188   return 0;
1189 }
1190
1191 int jlog_ctx_remove_subscriber(jlog_ctx *ctx, const char *s) {
1192   char name[MAXPATHLEN];
1193   int rv;
1194
1195   compute_checkpoint_filename(ctx, s, name);
1196   rv = unlink(name);
1197
1198   if (rv == 0) {
1199     ctx->last_error = JLOG_ERR_SUCCESS;
1200     return 1;
1201   }
1202   if (errno == ENOENT) {
1203     ctx->last_error = JLOG_ERR_INVALID_SUBSCRIBER;
1204     return 0;
1205   }
1206   return -1;
1207 }
1208
1209 int jlog_ctx_add_subscriber(jlog_ctx *ctx, const char *s, jlog_position whence) {
1210   jlog_id chkpt;
1211   jlog_file *jchkpt;
1212   ctx->last_error = JLOG_ERR_SUCCESS;
1213
1214   jchkpt = __jlog_open_named_checkpoint(ctx, s, O_CREAT|O_EXCL);
1215   if(!jchkpt) {
1216     ctx->last_error = JLOG_ERR_SUBSCRIBER_EXISTS;
1217     ctx->last_errno = EEXIST;
1218     return -1;
1219   }
1220   jlog_file_close(jchkpt);
1221  
1222   if(whence == JLOG_BEGIN) {
1223     memset(&chkpt, 0, sizeof(chkpt));
1224     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0) {
1225       ctx->last_error = JLOG_ERR_CHECKPOINT;
1226       ctx->last_errno = 0;
1227       return -1;
1228     }
1229     return 0;
1230   }
1231   ctx->last_error = JLOG_ERR_NOT_SUPPORTED;
1232   return -1;
1233 }
1234
1235 int jlog_ctx_write(jlog_ctx *ctx, const void *data, size_t len) {
1236   jlog_message m;
1237   m.mess = (void *)data;
1238   m.mess_len = len;
1239   return jlog_ctx_write_message(ctx, &m, NULL);
1240 }
1241
1242 static int __jlog_find_first_log_after(jlog_ctx *ctx, jlog_id *chkpt,
1243                                 jlog_id *start, jlog_id *finish) {
1244   jlog_id last;
1245   int closed;
1246  
1247   memcpy(start, chkpt, sizeof(*chkpt));
1248  attempt:
1249   if(__jlog_resync_index(ctx, start->log, &last, &closed) != 0) {
1250     if(ctx->last_error == JLOG_ERR_FILE_OPEN &&
1251        ctx->last_errno == ENOENT) {
1252       /* That file doesn't exist... bad, but we can fake a recovery by
1253          advancing the next file that does exist */
1254       ctx->last_error = JLOG_ERR_SUCCESS;
1255       if(start->log >= ctx->storage.log) {
1256         /* We don't advance past where people are writing */
1257         memcpy(finish, start, sizeof(*start));
1258         return 0;
1259       }
1260       start->marker = 0;
1261       start->log++;  /* BE SMARTER! */
1262       goto attempt;
1263     }
1264     return -1; /* Just persist resync's error state */
1265   }
1266
1267   /* If someone checkpoints off the end, be nice */
1268   if(last.log == start->log && last.marker < start->marker)
1269     memcpy(start, &last, sizeof(*start));
1270
1271   if(!memcmp(start, &last, sizeof(last)) && closed) {
1272     /* the chkpt is the last of the index and the index is closed...
1273        next please */
1274 #ifdef DEBUG
1275     fprintf(stderr, "checkpoint at end of file... advancing [%08x]\n",
1276             start->log+1);
1277 #endif
1278     if(start->log >= ctx->storage.log) {
1279       /* We don't advance past where people are writing */
1280       memcpy(finish, start, sizeof(*start));
1281       return 0;
1282     }
1283     start->marker = 0;
1284     start->log++;
1285     goto attempt;
1286   }
1287   memcpy(finish, &last, sizeof(last));
1288   return 0;
1289 }
1290 int jlog_ctx_read_message(jlog_ctx *ctx, const jlog_id *id, jlog_message *m) {
1291   off_t index_len;
1292   u_int64_t data_off;
1293
1294   ctx->last_error = JLOG_ERR_SUCCESS;
1295   if (ctx->context_mode != JLOG_READ)
1296     SYS_FAIL(JLOG_ERR_ILLEGAL_WRITE);
1297   if (id->marker < 1)
1298     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1299
1300   __jlog_open_reader(ctx, id->log);
1301   if(!ctx->data)
1302     SYS_FAIL(JLOG_ERR_FILE_OPEN);
1303   __jlog_open_indexer(ctx, id->log);
1304   if(!ctx->index)
1305     SYS_FAIL(JLOG_ERR_IDX_OPEN);
1306
1307   if ((index_len = jlog_file_size(ctx->index)) == -1)
1308     SYS_FAIL(JLOG_ERR_IDX_SEEK);
1309   if (index_len % sizeof(u_int64_t))
1310     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1311   if (id->marker * sizeof(u_int64_t) > index_len)
1312     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1313
1314   if (!jlog_file_pread(ctx->index, &data_off, sizeof(u_int64_t),
1315                        (id->marker - 1) * sizeof(u_int64_t)))
1316   {
1317     SYS_FAIL(JLOG_ERR_IDX_READ);
1318   }
1319   if (data_off == 0 && id->marker != 1) {
1320     if (id->marker * sizeof(u_int64_t) == index_len) {
1321       /* close tag; not a real offset */
1322       SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1323     } else {
1324       /* an offset of 0 in the middle of an index means curruption */
1325       SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1326     }
1327   }
1328
1329   if(__jlog_mmap_reader(ctx, id->log) != 0)
1330     SYS_FAIL(JLOG_ERR_FILE_READ);
1331
1332   if(data_off > ctx->mmap_len - sizeof(jlog_message_header)) {
1333 #ifdef DEBUG
1334     fprintf(stderr, "read idx off end: %llu\n", data_off);
1335 #endif
1336     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1337   }
1338
1339   memcpy(&m->aligned_header, ((u_int8_t *)ctx->mmap_base) + data_off,
1340          sizeof(jlog_message_header));
1341
1342   if(data_off + sizeof(jlog_message_header) + m->aligned_header.mlen > ctx->mmap_len) {
1343 #ifdef DEBUG
1344     fprintf(stderr, "read idx off end: %llu %llu\n", data_off, ctx->mmap_len);
1345 #endif
1346     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1347   }
1348
1349   m->header = &m->aligned_header;
1350   m->mess_len = m->header->mlen;
1351   m->mess = (((u_int8_t *)ctx->mmap_base) + data_off + sizeof(jlog_message_header));
1352
1353  finish:
1354   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1355   if (ctx->last_error == JLOG_ERR_IDX_CORRUPT) {
1356     if (jlog_file_lock(ctx->index)) {
1357       jlog_file_truncate(ctx->index, 0);
1358       jlog_file_unlock(ctx->index);
1359     }
1360   }
1361   return -1;
1362 }
1363 int jlog_ctx_read_interval(jlog_ctx *ctx, jlog_id *start, jlog_id *finish) {
1364   jlog_id chkpt;
1365   int count = 0;
1366
1367   ctx->last_error = JLOG_ERR_SUCCESS;
1368   if(ctx->context_mode != JLOG_READ) {
1369     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1370     ctx->last_errno = EPERM;
1371     return -1;
1372   }
1373
1374   __jlog_restore_metastore(ctx, 0);
1375   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &chkpt))
1376     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1377   if(__jlog_find_first_log_after(ctx, &chkpt, start, finish) != 0)
1378     goto finish; /* Leave whatever error was set in find_first_log_after */
1379   if(start->log != chkpt.log) start->marker = 0;
1380   else start->marker = chkpt.marker;
1381   if(start->log != chkpt.log) {
1382     /* We've advanced our checkpoint, let's not do this work again */
1383     if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, start) != 0)
1384       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1385   }
1386   /* Here 'start' is actually the checkpoint, so we must advance it one.
1387      However, that may not be possible, if there are no messages, so first
1388      make sure finish is bigger */
1389   count = finish->marker - start->marker;
1390   if(finish->marker > start->marker) start->marker++;
1391
1392   /* We need to munmap it, so that we can remap it with more data if needed */
1393   __jlog_munmap_reader(ctx);
1394  finish:
1395   if(ctx->last_error == JLOG_ERR_SUCCESS) return count;
1396   return -1;
1397 }
1398
1399 int jlog_ctx_last_log_id(jlog_ctx *ctx, jlog_id *id) {
1400   ctx->last_error = JLOG_ERR_SUCCESS;
1401   if(ctx->context_mode != JLOG_READ) {
1402     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1403     ctx->last_errno = EPERM;
1404     return -1;
1405   }
1406   if (__jlog_restore_metastore(ctx, 0) != 0) return -1;
1407   ___jlog_resync_index(ctx, ctx->storage.log, id, NULL);
1408   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1409   return -1;
1410 }
1411
1412 /* vim:se ts=2 sw=2 et: */
Note: See TracBrowser for help on using the browser.