root/jlog.c

Revision b0a67cd1467f97fc302fde45d10a68dd636220d1, 47.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 weeks ago)

backout an overaggressive truncate in d5413213

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2008, Message Systems, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name Message Systems, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 /*****************************************************************
34
35   Journaled logging... append only.
36
37       (1) find current file, or allocate a file, extendible and mark
38           it current.
39  
40       (2) Write records to it, records include their size, so
41           a simple inspection can detect and incomplete trailing
42           record.
43    
44       (3) Write append until the file reaches a certain size.
45
46       (4) Allocate a file, extensible.
47
48       (5) RESYNC INDEX on 'finished' file (see reading:3) and postpend
49           an offset '0' to the index.
50    
51       (2) goto (1)
52    
53   Reading journals...
54
55       (1) find oldest checkpoint of all subscribers, remove all older files.
56
57       (2) (file, last_read) = find_checkpoint for this subscriber
58
59       (3) RESYNC INDEX:
60           open record index for file, seek to the end -  off_t.
61           this is the offset of the last noticed record in this file.
62           open file, seek to this point, roll forward writing the index file
63           _do not_ write an offset for the last record unless it is found
64           complete.
65
66       (4) read entries from last_read+1 -> index of record index
67
68 */
69 #include <stdio.h>
70
71 #include "jlog_config.h"
72 #include "jlog_private.h"
73 #if HAVE_UNISTD_H
74 #include <unistd.h>
75 #endif
76 #if HAVE_SYS_TIME_H
77 #include <sys/time.h>
78 #endif
79 #if HAVE_DIRENT_H
80 #include <dirent.h>
81 #endif
82 #if HAVE_FCNTL_H
83 #include <fcntl.h>
84 #endif
85 #if HAVE_ERRNO_H
86 #include <errno.h>
87 #endif
88 #if HAVE_TIME_H
89 #include <time.h>
90 #endif
91 #if HAVE_SYS_MMAN_H
92 #include <sys/mman.h>
93 #endif
94
95 #define BUFFERED_INDICES 1024
96
97 static jlog_file *__jlog_open_writer(jlog_ctx *ctx);
98 static int __jlog_close_writer(jlog_ctx *ctx);
99 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log);
100 static int __jlog_close_reader(jlog_ctx *ctx);
101 static int __jlog_close_checkpoint(jlog_ctx *ctx);
102 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log);
103 static int __jlog_close_indexer(jlog_ctx *ctx);
104 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *c);
105 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags);
106 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log);
107 static int __jlog_munmap_reader(jlog_ctx *ctx);
108
109 int jlog_snprint_logid(char *b, int n, const jlog_id *id) {
110   return snprintf(b, n, "%08x:%08x", id->log, id->marker);
111 }
112
113 int jlog_repair_datafile(jlog_ctx *ctx, u_int32_t log)
114 {
115   jlog_message_header hdr;
116   char *this, *next, *afternext = NULL, *mmap_end;
117   int i, invalid_count = 0;
118   struct {
119     off_t start, end;
120   } *invalid = NULL;
121   off_t orig_len, src, dst, len;
122
123 #define TAG_INVALID(s, e) do { \
124   if (invalid_count) \
125     invalid = realloc(invalid, (invalid_count + 1) * sizeof(*invalid)); \
126   else \
127     invalid = malloc(sizeof(*invalid)); \
128   invalid[invalid_count].start = s - (char *)ctx->mmap_base; \
129   invalid[invalid_count].end = e - (char *)ctx->mmap_base; \
130   invalid_count++; \
131 } while (0)
132
133   ctx->last_error = JLOG_ERR_SUCCESS;
134
135   /* we want the reader's open logic because this runs in the read path
136    * the underlying fds are always RDWR anyway */
137   __jlog_open_reader(ctx, log);
138   if (!ctx->data) {
139     ctx->last_error = JLOG_ERR_FILE_OPEN;
140     ctx->last_errno = errno;
141     return -1;
142   }
143   if (!jlog_file_lock(ctx->data)) {
144     ctx->last_error = JLOG_ERR_LOCK;
145     ctx->last_errno = errno;
146     return -1;
147   }
148   if (__jlog_mmap_reader(ctx, log) != 0)
149     SYS_FAIL(JLOG_ERR_FILE_READ);
150
151   orig_len = ctx->mmap_len;
152   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
153   /* these values will cause us to fall right into the error clause and
154    * start searching for a valid header from offset 0 */
155   this = (char*)ctx->mmap_base - sizeof(hdr);
156   hdr.reserved = ctx->meta->hdr_magic;
157   hdr.mlen = 0;
158
159   while (this + sizeof(hdr) <= mmap_end) {
160     next = this + sizeof(hdr) + hdr.mlen;
161     if (next <= (char *)ctx->mmap_base) goto error;
162     if (next == mmap_end) {
163       this = next;
164       break;
165     }
166     if (next + sizeof(hdr) > mmap_end) goto error;
167     memcpy(&hdr, next, sizeof(hdr));
168     if (hdr.reserved != ctx->meta->hdr_magic) goto error;
169     this = next;
170     continue;
171   error:
172     for (next = this + sizeof(hdr); next + sizeof(hdr) <= mmap_end; next++) {
173       if (!next[0] && !next[1] && !next[2] && !next[3]) {
174         memcpy(&hdr, next, sizeof(hdr));
175         afternext = next + sizeof(hdr) + hdr.mlen;
176         if (afternext <= (char *)ctx->mmap_base) continue;
177         if (afternext == mmap_end) break;
178         if (afternext + sizeof(hdr) > mmap_end) continue;
179         memcpy(&hdr, afternext, sizeof(hdr));
180         if (hdr.reserved == ctx->meta->hdr_magic) break;
181       }
182     }
183     /* correct for while loop entry condition */
184     if (this < (char *)ctx->mmap_base) this = ctx->mmap_base;
185     if (next + sizeof(hdr) > mmap_end) break;
186     if (next > this) TAG_INVALID(this, next);
187     this = afternext;
188   }
189   if (this != mmap_end) TAG_INVALID(this, mmap_end);
190
191 #undef TAG_INVALID
192
193 #define MOVE_SEGMENT do { \
194   char cpbuff[4096]; \
195   off_t chunk; \
196   while(len > 0) { \
197     chunk = len; \
198     if (chunk > sizeof(cpbuff)) chunk = sizeof(cpbuff); \
199     if (!jlog_file_pread(ctx->data, &cpbuff, chunk, src)) \
200       SYS_FAIL(JLOG_ERR_FILE_READ); \
201     if (!jlog_file_pwrite(ctx->data, &cpbuff, chunk, dst)) \
202       SYS_FAIL(JLOG_ERR_FILE_WRITE); \
203     src += chunk; \
204     dst += chunk; \
205     len -= chunk; \
206   } \
207 } while (0)
208
209   if (invalid_count > 0) {
210     __jlog_munmap_reader(ctx);
211     dst = invalid[0].start;
212     for (i = 0; i < invalid_count - 1; ) {
213       src = invalid[i].end;
214       len = invalid[++i].start - src;
215       MOVE_SEGMENT;
216     }
217     src = invalid[invalid_count - 1].end;
218     len = orig_len - src;
219     if (len > 0) MOVE_SEGMENT;
220     if (!jlog_file_truncate(ctx->data, dst))
221       SYS_FAIL(JLOG_ERR_FILE_WRITE);
222   }
223
224 #undef MOVE_SEGMENT
225
226 finish:
227   jlog_file_unlock(ctx->data);
228   if (invalid) free(invalid);
229   if (ctx->last_error != JLOG_ERR_SUCCESS) return -1;
230   return invalid_count;
231 }
232
233 int jlog_inspect_datafile(jlog_ctx *ctx, u_int32_t log, int verbose)
234 {
235   jlog_message_header hdr;
236   char *this, *next, *mmap_end;
237   int i;
238   time_t timet;
239   struct tm tm;
240   char tbuff[128];
241
242   ctx->last_error = JLOG_ERR_SUCCESS;
243
244   __jlog_open_reader(ctx, log);
245   if (!ctx->data)
246     SYS_FAIL(JLOG_ERR_FILE_OPEN);
247   if (__jlog_mmap_reader(ctx, log) != 0)
248     SYS_FAIL(JLOG_ERR_FILE_READ);
249
250   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
251   this = ctx->mmap_base;
252   i = 0;
253   while (this + sizeof(hdr) <= mmap_end) {
254     int initial = 1;
255     memcpy(&hdr, this, sizeof(hdr));
256     i++;
257     if (hdr.reserved != ctx->meta->hdr_magic) {
258       fprintf(stderr, "Message %d at [%ld] has invalid reserved value %u\n",
259               i, (long int)(this - (char *)ctx->mmap_base), hdr.reserved);
260       return 1;
261     }
262
263 #define PRINTMSGHDR do { if(initial) { \
264   fprintf(stderr, "Message %d at [%ld] of (%lu+%u)", i, \
265           (long int)(this - (char *)ctx->mmap_base), \
266           (long unsigned int)sizeof(hdr), hdr.mlen); \
267   initial = 0; \
268 } } while(0)
269
270     if(verbose) {
271       PRINTMSGHDR;
272     }
273
274     next = this + sizeof(hdr) + hdr.mlen;
275     if (next <= (char *)ctx->mmap_base) {
276       PRINTMSGHDR;
277       fprintf(stderr, " WRAPPED TO NEGATIVE OFFSET!\n");
278       return 1;
279     }
280     if (next > mmap_end) {
281       PRINTMSGHDR;
282       fprintf(stderr, " OFF THE END!\n");
283       return 1;
284     }
285
286     timet = hdr.tv_sec;
287     localtime_r(&timet, &tm);
288     strftime(tbuff, sizeof(tbuff), "%c", &tm);
289     if(verbose) fprintf(stderr, "\n\ttime: %s\n\tmlen: %u\n", tbuff, hdr.mlen);
290     this = next;
291   }
292   if (this < mmap_end) {
293     fprintf(stderr, "%ld bytes of junk at the end\n",
294             (long int)(mmap_end - this));
295     return 1;
296   }
297
298   return 0;
299 finish:
300   return -1;
301 }
302
303 int jlog_idx_details(jlog_ctx *ctx, u_int32_t log,
304                      u_int32_t *marker, int *closed)
305 {
306   off_t index_len;
307   u_int64_t index;
308
309   __jlog_open_indexer(ctx, log);
310   if (!ctx->index)
311     SYS_FAIL(JLOG_ERR_IDX_OPEN);
312   if ((index_len = jlog_file_size(ctx->index)) == -1)
313     SYS_FAIL(JLOG_ERR_IDX_SEEK);
314   if (index_len % sizeof(u_int64_t))
315     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
316   if (index_len > sizeof(u_int64_t)) {
317     if (!jlog_file_pread(ctx->index, &index, sizeof(u_int64_t),
318                          index_len - sizeof(u_int64_t)))
319     {
320       SYS_FAIL(JLOG_ERR_IDX_READ);
321     }
322     if (index) {
323       *marker = index_len / sizeof(u_int64_t);
324       *closed = 0;
325     } else {
326       *marker = (index_len / sizeof(u_int64_t)) - 1;
327       *closed = 1;
328     }
329   } else {
330     *marker = index_len / sizeof(u_int64_t);
331     *closed = 0;
332   }
333
334   return 0;
335 finish:
336   return -1;
337 }
338
339 static int __jlog_unlink_datafile(jlog_ctx *ctx, u_int32_t log) {
340   char file[MAXPATHLEN];
341   int len;
342
343   if(ctx->current_log == log) {
344     __jlog_close_reader(ctx);
345     __jlog_close_indexer(ctx);
346   }
347
348   STRSETDATAFILE(ctx, file, log);
349 #ifdef DEBUG
350   fprintf(stderr, "unlinking %s\n", file);
351 #endif
352   unlink(file);
353
354   len = strlen(file);
355   if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
356   memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
357 #ifdef DEBUG
358   fprintf(stderr, "unlinking %s\n", file);
359 #endif
360   unlink(file);
361   return 0;
362 }
363
364 static int __jlog_open_metastore(jlog_ctx *ctx)
365 {
366   char file[MAXPATHLEN];
367   int len;
368
369 #ifdef DEBUG
370   fprintf(stderr, "__jlog_open_metastore\n");
371 #endif
372   len = strlen(ctx->path);
373   if((len + 1 /* IFS_CH */ + 9 /* "metastore" */ + 1) > MAXPATHLEN) {
374 #ifdef ENAMETOOLONG
375     ctx->last_errno = ENAMETOOLONG;
376 #endif
377     ctx->last_error = JLOG_ERR_CREATE_META;
378     return -1;
379   }
380   memcpy(file, ctx->path, len);
381   file[len++] = IFS_CH;
382   memcpy(&file[len], "metastore", 10); /* "metastore" + '\0' */
383
384   ctx->metastore = jlog_file_open(file, O_CREAT, ctx->file_mode);
385
386   if (!ctx->metastore) {
387     ctx->last_errno = errno;
388     ctx->last_error = JLOG_ERR_CREATE_META;
389     return -1;
390   }
391
392   return 0;
393 }
394
395 /* exported */
396 int __jlog_pending_readers(jlog_ctx *ctx, u_int32_t log) {
397   return jlog_pending_readers(ctx, log, NULL);
398 }
399 int jlog_pending_readers(jlog_ctx *ctx, u_int32_t log,
400                          u_int32_t *earliest_out) {
401   int readers;
402   DIR *dir;
403   struct dirent *ent;
404   char file[MAXPATHLEN];
405   int len, seen = 0;
406   u_int32_t earliest = 0;
407   jlog_id id;
408
409   readers = 0;
410
411   dir = opendir(ctx->path);
412   if (!dir) return -1;
413
414   len = strlen(ctx->path);
415   if(len + 2 > sizeof(file)) {
416     closedir(dir);
417     return -1;
418   }
419   memcpy(file, ctx->path, len);
420   file[len++] = IFS_CH;
421   file[len] = '\0';
422
423   while ((ent = readdir(dir))) {
424     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
425       jlog_file *cp;
426       int dlen;
427
428       dlen = strlen(ent->d_name);
429       if((len + dlen + 1) > sizeof(file)) continue;
430       memcpy(file + len, ent->d_name, dlen + 1); /* include \0 */
431 #ifdef DEBUG
432       fprintf(stderr, "Checking if %s needs %s...\n", ent->d_name, ctx->path);
433 #endif
434       if ((cp = jlog_file_open(file, 0, ctx->file_mode))) {
435         if (jlog_file_lock(cp)) {
436           (void) jlog_file_pread(cp, &id, sizeof(id), 0);
437 #ifdef DEBUG
438           fprintf(stderr, "\t%u <= %u (pending reader)\n", id.log, log);
439 #endif
440           if (!seen) {
441             earliest = id.log;
442             seen = 1;
443           }
444           else {
445             if(id.log < earliest) {
446               earliest = id.log;
447             }
448           }
449           if (id.log <= log) {
450             readers++;
451           }
452           jlog_file_unlock(cp);
453         }
454         jlog_file_close(cp);
455       }
456     }
457   }
458   closedir(dir);
459   if(earliest_out) *earliest_out = earliest;
460   return readers;
461 }
462 struct _jlog_subs {
463   char **subs;
464   int used;
465   int allocd;
466 };
467
468 int jlog_ctx_list_subscribers_dispose(jlog_ctx *ctx, char **subs) {
469   char *s;
470   int i = 0;
471   if(subs) {
472     while((s = subs[i++]) != NULL) free(s);
473     free(subs);
474   }
475   return 0;
476 }
477
478 int jlog_ctx_list_subscribers(jlog_ctx *ctx, char ***subs) {
479   struct _jlog_subs js = { NULL, 0, 0 };
480   DIR *dir;
481   struct dirent *ent;
482   unsigned char file[MAXPATHLEN];
483   char *p;
484   int len;
485
486   js.subs = calloc(16, sizeof(char *));
487   js.allocd = 16;
488
489   dir = opendir(ctx->path);
490   if (!dir) return -1;
491   while ((ent = readdir(dir))) {
492     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
493
494       for (len = 0, p = ent->d_name + 3; *p;) {
495         unsigned char c;
496         int i;
497
498         for (c = 0, i = 0; i < 16; i++) {
499           if (__jlog_hexchars[i] == *p) {
500             c = i << 4;
501             break;
502           }
503         }
504         p++;
505         for (i = 0; i < 16; i++) {
506           if (__jlog_hexchars[i] == *p) {
507             c |= i;
508             break;
509           }
510         }
511         p++;
512         file[len++] = c;
513       }
514       file[len] = '\0';
515
516       js.subs[js.used++] = strdup((char *)file);
517       if(js.used == js.allocd) {
518         js.allocd *= 2;
519         js.subs = realloc(js.subs, js.allocd*sizeof(char *));
520       }
521       js.subs[js.used] = NULL;
522     }
523   }
524   closedir(dir);
525   *subs = js.subs;
526   return js.used;
527 }
528
529 static int __jlog_save_metastore(jlog_ctx *ctx, int ilocked)
530 {
531 #ifdef DEBUG
532   fprintf(stderr, "__jlog_save_metastore\n");
533 #endif
534
535   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
536     return -1;
537   }
538
539   if(ctx->meta_is_mapped) {
540     int rv, flags = MS_INVALIDATE;
541     if(ctx->meta->safety == JLOG_SAFE) flags |= MS_SYNC;
542     rv = msync(ctx->meta, sizeof(*ctx->meta), flags);
543     if (!ilocked) jlog_file_unlock(ctx->metastore);
544     return rv;
545   }
546   else {
547     if (!jlog_file_pwrite(ctx->metastore, ctx->meta, sizeof(*ctx->meta), 0)) {
548       if (!ilocked) jlog_file_unlock(ctx->metastore);
549       return -1;
550     }
551     if (ctx->meta->safety == JLOG_SAFE) {
552       jlog_file_sync(ctx->metastore);
553     }
554   }
555
556   if (!ilocked) jlog_file_unlock(ctx->metastore);
557   return 0;
558 }
559
560 static int __jlog_restore_metastore(jlog_ctx *ctx, int ilocked)
561 {
562   void *base = NULL;
563   size_t len = 0;
564   if(ctx->meta_is_mapped) return 0;
565 #ifdef DEBUG
566   fprintf(stderr, "__jlog_restore_metastore\n");
567 #endif
568
569   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
570     return -1;
571   }
572
573   if(ctx->meta_is_mapped == 0) {
574     int rv;
575     rv = jlog_file_map_rdwr(ctx->metastore, &base, &len);
576     if(rv != 1) {
577       if (!ilocked) jlog_file_unlock(ctx->metastore);
578       return -1;
579     }
580     if(len == 12) {
581       /* old metastore format doesn't have the new magic hdr in it
582        * we need to extend it by four bytes, but we know the hdr was
583        * previously 0, so we write out zero.
584        */
585        u_int32_t dummy = 0;
586        jlog_file_pwrite(ctx->metastore, &dummy, sizeof(dummy), 12);
587        rv = jlog_file_map_rdwr(ctx->metastore, &base, &len);
588     }
589     if(rv != 1 || len != sizeof(*ctx->meta)) {
590       if (!ilocked) jlog_file_unlock(ctx->metastore);
591       return -1;
592     }
593     ctx->meta = base;
594     ctx->meta_is_mapped = 1;
595   }
596
597   if (!ilocked) jlog_file_unlock(ctx->metastore);
598
599   if(ctx->meta != &ctx->pre_init)
600     ctx->pre_init.hdr_magic = ctx->meta->hdr_magic;
601   return 0;
602 }
603
604 int jlog_get_checkpoint(jlog_ctx *ctx, const char *s, jlog_id *id) {
605   jlog_file *f;
606   int rv = -1;
607
608   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
609     if(!ctx->checkpoint) {
610       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
611     }
612     f = ctx->checkpoint;
613   } else
614     f = __jlog_open_named_checkpoint(ctx, s, 0);
615
616   if (f) {
617     if (jlog_file_lock(f)) {
618       if (jlog_file_pread(f, id, sizeof(*id), 0)) rv = 0;
619       jlog_file_unlock(f);
620     }
621   }
622   if (f && f != ctx->checkpoint) jlog_file_close(f);
623   return rv;
624 }
625
626 static int __jlog_set_checkpoint(jlog_ctx *ctx, const char *s, const jlog_id *id)
627 {
628   jlog_file *f;
629   int rv = -1;
630   jlog_id old_id;
631   u_int32_t log;
632
633   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
634     if(!ctx->checkpoint) {
635       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
636     }
637     f = ctx->checkpoint;
638   } else
639     f = __jlog_open_named_checkpoint(ctx, s, 0);
640
641   if(!f) return -1;
642   if (!jlog_file_lock(f))
643     goto failset;
644
645   if (jlog_file_size(f) == 0) {
646     /* we're setting it for the first time, no segments were pending on it */
647     old_id.log = id->log;
648   } else {
649     if (!jlog_file_pread(f, &old_id, sizeof(old_id), 0))
650       goto failset;
651   }
652   if (!jlog_file_pwrite(f, id, sizeof(*id), 0))
653     goto failset;
654   if (ctx->meta->safety == JLOG_SAFE) {
655     jlog_file_sync(f);
656   }
657   jlog_file_unlock(f);
658   rv = 0;
659
660   for (log = old_id.log; log < id->log; log++) {
661     if (__jlog_pending_readers(ctx, log) == 0) {
662       __jlog_unlink_datafile(ctx, log);
663     }
664   }
665
666  failset:
667   if (f && f != ctx->checkpoint) jlog_file_close(f);
668   return rv;
669 }
670
671 static int __jlog_close_metastore(jlog_ctx *ctx) {
672   if (ctx->metastore) {
673     jlog_file_close(ctx->metastore);
674     ctx->metastore = NULL;
675   }
676   if (ctx->meta_is_mapped) {
677     munmap((void *)ctx->meta, sizeof(*ctx->meta));
678     ctx->meta = &ctx->pre_init;
679     ctx->meta_is_mapped = 0;
680   }
681   return 0;
682 }
683
684 /* path is assumed to be MAXPATHLEN */
685 static char *compute_checkpoint_filename(jlog_ctx *ctx, const char *subscriber, char *name)
686 {
687   const char *sub;
688   int len;
689
690   /* build checkpoint filename */
691   len = strlen(ctx->path);
692   memcpy(name, ctx->path, len);
693   name[len++] = IFS_CH;
694   name[len++] = 'c';
695   name[len++] = 'p';
696   name[len++] = '.';
697   for (sub = subscriber; *sub; ) {
698     name[len++] = __jlog_hexchars[((*sub & 0xf0) >> 4)];
699     name[len++] = __jlog_hexchars[(*sub & 0x0f)];
700     sub++;
701   }
702   name[len] = '\0';
703
704 #ifdef DEBUG
705   fprintf(stderr, "checkpoint %s filename is %s\n", subscriber, name);
706 #endif
707   return name;
708 }
709
710 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags)
711 {
712   char name[MAXPATHLEN];
713   compute_checkpoint_filename(ctx, cpname, name);
714   return jlog_file_open(name, flags, ctx->file_mode);
715 }
716
717 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log) {
718   char file[MAXPATHLEN];
719
720   if(ctx->current_log != log) {
721     __jlog_close_reader(ctx);
722     __jlog_close_indexer(ctx);
723   }
724   if(ctx->data) {
725     return ctx->data;
726   }
727   STRSETDATAFILE(ctx, file, log);
728 #ifdef DEBUG
729   fprintf(stderr, "opening log file[ro]: '%s'\n", file);
730 #endif
731   ctx->data = jlog_file_open(file, 0, ctx->file_mode);
732   ctx->current_log = log;
733   return ctx->data;
734 }
735
736 static int __jlog_munmap_reader(jlog_ctx *ctx) {
737   if(ctx->mmap_base) {
738     munmap(ctx->mmap_base, ctx->mmap_len);
739     ctx->mmap_base = NULL;
740     ctx->mmap_len = 0;
741   }
742   return 0;
743 }
744
745 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log) {
746   if(ctx->current_log == log && ctx->mmap_base) return 0;
747   __jlog_open_reader(ctx, log);
748   if(!ctx->data)
749     return -1;
750   if (!jlog_file_map_read(ctx->data, &ctx->mmap_base, &ctx->mmap_len)) {
751     ctx->mmap_base = NULL;
752     ctx->last_error = JLOG_ERR_FILE_READ;
753     ctx->last_errno = errno;
754     return -1;
755   }
756   return 0;
757 }
758
759 static jlog_file *__jlog_open_writer(jlog_ctx *ctx) {
760   char file[MAXPATHLEN];
761
762   if(ctx->data) {
763     /* Still open */
764     return ctx->data;
765   }
766
767   if(!jlog_file_lock(ctx->metastore))
768     SYS_FAIL(JLOG_ERR_LOCK);
769   if(__jlog_restore_metastore(ctx, 1))
770     SYS_FAIL(JLOG_ERR_META_OPEN);
771   ctx->current_log =  ctx->meta->storage_log;
772   STRSETDATAFILE(ctx, file, ctx->current_log);
773 #ifdef DEBUG
774   fprintf(stderr, "opening log file[rw]: '%s'\n", file);
775 #endif
776   ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode);
777  finish:
778   jlog_file_unlock(ctx->metastore);
779   return ctx->data;
780 }
781
782 static int __jlog_close_writer(jlog_ctx *ctx) {
783   if (ctx->data) {
784     jlog_file_close(ctx->data);
785     ctx->data = NULL;
786   }
787   return 0;
788 }
789
790 static int __jlog_close_reader(jlog_ctx *ctx) {
791   __jlog_munmap_reader(ctx);
792   if (ctx->data) {
793     jlog_file_close(ctx->data);
794     ctx->data = NULL;
795   }
796   return 0;
797 }
798
799 static int __jlog_close_checkpoint(jlog_ctx *ctx) {
800   if (ctx->checkpoint) {
801     jlog_file_close(ctx->checkpoint);
802     ctx->checkpoint = NULL;
803   }
804   return 0;
805 }
806
807 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log) {
808   char file[MAXPATHLEN];
809   int len;
810
811   if(ctx->current_log != log) {
812     __jlog_close_reader(ctx);
813     __jlog_close_indexer(ctx);
814   }
815   if(ctx->index) {
816     return ctx->index;
817   }
818   STRSETDATAFILE(ctx, file, log);
819
820   len = strlen(file);
821   if((len + sizeof(INDEX_EXT)) > sizeof(file)) return NULL;
822   memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
823 #ifdef DEBUG
824   fprintf(stderr, "opening index file: '%s'\n", file);
825 #endif
826   ctx->index = jlog_file_open(file, O_CREAT, ctx->file_mode);
827   ctx->current_log = log;
828   return ctx->index;
829 }
830
831 static int __jlog_close_indexer(jlog_ctx *ctx) {
832   if (ctx->index) {
833     jlog_file_close(ctx->index);
834     ctx->index = NULL;
835   }
836   return 0;
837 }
838
839 static int
840 ___jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last,
841                      int *closed) {
842   jlog_message_header logmhdr;
843   int i, second_try = 0;
844   off_t index_off, data_off, data_len;
845   u_int64_t index;
846   u_int64_t indices[BUFFERED_INDICES];
847
848   ctx->last_error = JLOG_ERR_SUCCESS;
849   if(closed) *closed = 0;
850
851   __jlog_open_reader(ctx, log);
852   if (!ctx->data) {
853     ctx->last_error = JLOG_ERR_FILE_OPEN;
854     ctx->last_errno = errno;
855     return -1;
856   }
857
858 #define RESTART do { \
859   if (second_try == 0) { \
860     jlog_file_truncate(ctx->index, index_off); \
861     jlog_file_unlock(ctx->index); \
862     second_try = 1; \
863     ctx->last_error = JLOG_ERR_SUCCESS; \
864     goto restart; \
865   } \
866   SYS_FAIL(JLOG_ERR_IDX_CORRUPT); \
867 } while (0)
868
869 restart:
870   __jlog_open_indexer(ctx, log);
871   if (!ctx->index) {
872     ctx->last_error = JLOG_ERR_IDX_OPEN;
873     ctx->last_errno = errno;
874     return -1;
875   }
876   if (!jlog_file_lock(ctx->index)) {
877     ctx->last_error = JLOG_ERR_LOCK;
878     ctx->last_errno = errno;
879     return -1;
880   }
881
882   data_off = 0;
883   if ((data_len = jlog_file_size(ctx->data)) == -1)
884     SYS_FAIL(JLOG_ERR_FILE_SEEK);
885   if ((index_off = jlog_file_size(ctx->index)) == -1)
886     SYS_FAIL(JLOG_ERR_IDX_SEEK);
887
888   if (index_off % sizeof(u_int64_t)) {
889 #ifdef DEBUG
890     fprintf(stderr, "corrupt index [%llu]\n", index_off);
891 #endif
892     RESTART;
893   }
894
895   if (index_off > sizeof(u_int64_t)) {
896     if (!jlog_file_pread(ctx->index, &index, sizeof(index),
897                          index_off - sizeof(u_int64_t)))
898     {
899       SYS_FAIL(JLOG_ERR_IDX_READ);
900     }
901     if (index == 0) {
902       /* This log file has been "closed" */
903 #ifdef DEBUG
904       fprintf(stderr, "index closed\n");
905 #endif
906       if(last) {
907         last->log = log;
908         last->marker = (index_off / sizeof(u_int64_t)) - 1;
909       }
910       if(closed) *closed = 1;
911       goto finish;
912     } else {
913       if (index > data_len) {
914 #ifdef DEBUG
915         fprintf(stderr, "index told me to seek somehwere I can't\n");
916 #endif
917         RESTART;
918       }
919       data_off = index;
920     }
921   }
922
923   if (index_off > 0) {
924     /* We are adding onto a partial index so we must advance a record */
925     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
926       SYS_FAIL(JLOG_ERR_FILE_READ);
927     if ((data_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
928       RESTART;
929   }
930
931   i = 0;
932   while (data_off + sizeof(logmhdr) <= data_len) {
933     off_t next_off = data_off;
934
935     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
936       SYS_FAIL(JLOG_ERR_FILE_READ);
937     if (logmhdr.reserved != ctx->meta->hdr_magic) {
938 #ifdef DEBUG
939       fprintf(stderr, "logmhdr.reserved == %d\n", logmhdr.reserved);
940 #endif
941       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
942     }
943     if ((next_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
944       break;
945
946     /* Write our new index offset */
947     indices[i++] = data_off;
948     if(i >= BUFFERED_INDICES) {
949 #ifdef DEBUG
950       fprintf(stderr, "writing %i offsets\n", i);
951 #endif
952       if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
953         RESTART;
954       index_off += i * sizeof(u_int64_t);
955       i = 0;
956     }
957     data_off = next_off;
958   }
959   if(i > 0) {
960 #ifdef DEBUG
961     fprintf(stderr, "writing %i offsets\n", i);
962 #endif
963     if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
964       RESTART;
965     index_off += i * sizeof(u_int64_t);
966   }
967   if(last) {
968     last->log = log;
969     last->marker = index_off / sizeof(u_int64_t);
970   }
971   if(log < ctx->meta->storage_log) {
972     if (data_off != data_len) {
973 #ifdef DEBUG
974       fprintf(stderr, "closing index, but %llu != %llu\n", data_off, data_len);
975 #endif
976       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
977     }
978     /* Special case: if we are closing, we next write a '0'
979      * we can't write the closing marker if the data segment had no records
980      * in it, since it will be confused with an index to offset 0 by the
981      * next reader; this only happens when segments are repaired */
982     if (index_off) {
983       index = 0;
984       if (!jlog_file_pwrite(ctx->index, &index, sizeof(u_int64_t), index_off))
985         RESTART;
986       index_off += sizeof(u_int64_t);
987     }
988     if(closed) *closed = 1;
989   }
990 #undef RESTART
991
992 finish:
993   jlog_file_unlock(ctx->index);
994 #ifdef DEBUG
995   fprintf(stderr, "index is %s\n", closed?(*closed?"closed":"open"):"unknown");
996 #endif
997   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
998   return -1;
999 }
1000
1001 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *closed) {
1002   int attempts, rv = -1;
1003   for(attempts=0; attempts<4; attempts++) {
1004     rv = ___jlog_resync_index(ctx, log, last, closed);
1005     if(ctx->last_error == JLOG_ERR_SUCCESS) break;
1006     if(ctx->last_error == JLOG_ERR_FILE_OPEN ||
1007        ctx->last_error == JLOG_ERR_IDX_OPEN) break;
1008
1009     /* We can't fix the file if someone may write to it again */
1010     if(log >= ctx->meta->storage_log) break;
1011
1012     jlog_file_lock(ctx->index);
1013     /* it doesn't really matter what jlog_repair_datafile returns
1014      * we'll keep retrying anyway */
1015     jlog_repair_datafile(ctx, log);
1016     jlog_file_truncate(ctx->index, 0);
1017     jlog_file_unlock(ctx->index);
1018   }
1019   return rv;
1020 }
1021
1022 jlog_ctx *jlog_new(const char *path) {
1023   jlog_ctx *ctx;
1024   ctx = calloc(1, sizeof(*ctx));
1025   ctx->meta = &ctx->pre_init;
1026   ctx->pre_init.unit_limit = DEFAULT_UNIT_LIMIT;
1027   ctx->pre_init.safety = DEFAULT_SAFETY;
1028   ctx->pre_init.hdr_magic = DEFAULT_HDR_MAGIC;
1029   ctx->file_mode = DEFAULT_FILE_MODE;
1030   ctx->context_mode = JLOG_NEW;
1031   ctx->path = strdup(path);
1032   return ctx;
1033 }
1034
1035 void jlog_set_error_func(jlog_ctx *ctx, jlog_error_func Func, void *ptr) {
1036   ctx->error_func = Func;
1037   ctx->error_ctx = ptr;
1038 }
1039
1040 size_t jlog_raw_size(jlog_ctx *ctx) {
1041   DIR *d;
1042   struct dirent *de;
1043   size_t totalsize = 0;
1044   int ferr, len;
1045   char filename[MAXPATHLEN];
1046
1047   d = opendir(ctx->path);
1048   if(!d) return 0;
1049   len = strlen(ctx->path);
1050   memcpy(filename, ctx->path, len);
1051   filename[len++] = IFS_CH;
1052   while((de = readdir(d)) != NULL) {
1053     struct stat sb;
1054     int dlen;
1055
1056     dlen = strlen(de->d_name);
1057     if((len + dlen + 1) > sizeof(filename)) continue;
1058     memcpy(filename+len, de->d_name, dlen + 1); /* include \0 */
1059     while((ferr = stat(filename, &sb)) == -1 && errno == EINTR);
1060     if(ferr == 0 && S_ISREG(sb.st_mode)) totalsize += sb.st_size;
1061   }
1062   closedir(d);
1063   return totalsize;
1064 }
1065
1066 const char *jlog_ctx_err_string(jlog_ctx *ctx) {
1067   switch (ctx->last_error) {
1068 #define MSG_O_MATIC(x)  case x: return #x;
1069     MSG_O_MATIC( JLOG_ERR_SUCCESS);
1070     MSG_O_MATIC( JLOG_ERR_ILLEGAL_INIT);
1071     MSG_O_MATIC( JLOG_ERR_ILLEGAL_OPEN);
1072     MSG_O_MATIC( JLOG_ERR_OPEN);
1073     MSG_O_MATIC( JLOG_ERR_NOTDIR);
1074     MSG_O_MATIC( JLOG_ERR_CREATE_PATHLEN);
1075     MSG_O_MATIC( JLOG_ERR_CREATE_EXISTS);
1076     MSG_O_MATIC( JLOG_ERR_CREATE_MKDIR);
1077     MSG_O_MATIC( JLOG_ERR_CREATE_META);
1078     MSG_O_MATIC( JLOG_ERR_LOCK);
1079     MSG_O_MATIC( JLOG_ERR_IDX_OPEN);
1080     MSG_O_MATIC( JLOG_ERR_IDX_SEEK);
1081     MSG_O_MATIC( JLOG_ERR_IDX_CORRUPT);
1082     MSG_O_MATIC( JLOG_ERR_IDX_WRITE);
1083     MSG_O_MATIC( JLOG_ERR_IDX_READ);
1084     MSG_O_MATIC( JLOG_ERR_FILE_OPEN);
1085     MSG_O_MATIC( JLOG_ERR_FILE_SEEK);
1086     MSG_O_MATIC( JLOG_ERR_FILE_CORRUPT);
1087     MSG_O_MATIC( JLOG_ERR_FILE_READ);
1088     MSG_O_MATIC( JLOG_ERR_FILE_WRITE);
1089     MSG_O_MATIC( JLOG_ERR_META_OPEN);
1090     MSG_O_MATIC( JLOG_ERR_ILLEGAL_WRITE);
1091     MSG_O_MATIC( JLOG_ERR_ILLEGAL_CHECKPOINT);
1092     MSG_O_MATIC( JLOG_ERR_INVALID_SUBSCRIBER);
1093     MSG_O_MATIC( JLOG_ERR_ILLEGAL_LOGID);
1094     MSG_O_MATIC( JLOG_ERR_SUBSCRIBER_EXISTS);
1095     MSG_O_MATIC( JLOG_ERR_CHECKPOINT);
1096     MSG_O_MATIC( JLOG_ERR_NOT_SUPPORTED);
1097     MSG_O_MATIC( JLOG_ERR_CLOSE_LOGID);
1098     default: return "Unknown";
1099   }
1100 }
1101
1102 int jlog_ctx_err(jlog_ctx *ctx) {
1103   return ctx->last_error;
1104 }
1105
1106 int jlog_ctx_errno(jlog_ctx *ctx) {
1107   return ctx->last_errno;
1108 }
1109
1110 int jlog_ctx_alter_safety(jlog_ctx *ctx, jlog_safety safety) {
1111   if(ctx->meta->safety == safety) return 0;
1112   if(ctx->context_mode == JLOG_APPEND ||
1113      ctx->context_mode == JLOG_NEW) {
1114     ctx->meta->safety = safety;
1115     if(ctx->context_mode == JLOG_APPEND) {
1116       if(__jlog_save_metastore(ctx, 0) != 0) {
1117         SYS_FAIL(JLOG_ERR_CREATE_META);
1118       }
1119     }
1120     return 0;
1121   }
1122  finish:
1123   return -1;
1124 }
1125 int jlog_ctx_alter_journal_size(jlog_ctx *ctx, size_t size) {
1126   if(ctx->meta->unit_limit == size) return 0;
1127   if(ctx->context_mode == JLOG_APPEND ||
1128      ctx->context_mode == JLOG_NEW) {
1129     ctx->meta->unit_limit = size;
1130     if(ctx->context_mode == JLOG_APPEND) {
1131       if(__jlog_save_metastore(ctx, 0) != 0) {
1132         SYS_FAIL(JLOG_ERR_CREATE_META);
1133       }
1134     }
1135     return 0;
1136   }
1137  finish:
1138   return -1;
1139 }
1140 int jlog_ctx_alter_mode(jlog_ctx *ctx, int mode) {
1141   ctx->file_mode = mode;
1142   return 0;
1143 }
1144 int jlog_ctx_open_writer(jlog_ctx *ctx) {
1145   int rv;
1146   struct stat sb;
1147
1148   ctx->last_error = JLOG_ERR_SUCCESS;
1149   if(ctx->context_mode != JLOG_NEW) {
1150     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1151     return -1;
1152   }
1153   ctx->context_mode = JLOG_APPEND;
1154   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1155   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1156   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1157   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1158   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1159  finish:
1160   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1161   ctx->context_mode = JLOG_INVALID;
1162   return -1;
1163 }
1164 int jlog_ctx_open_reader(jlog_ctx *ctx, const char *subscriber) {
1165   int rv;
1166   struct stat sb;
1167   jlog_id dummy;
1168
1169   ctx->last_error = JLOG_ERR_SUCCESS;
1170   if(ctx->context_mode != JLOG_NEW) {
1171     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1172     return -1;
1173   }
1174   ctx->context_mode = JLOG_READ;
1175   ctx->subscriber_name = strdup(subscriber);
1176   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1177   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1178   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1179   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1180   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &dummy))
1181     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1182   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1183  finish:
1184   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1185   ctx->context_mode = JLOG_INVALID;
1186   return -1;
1187 }
1188 int jlog_ctx_init(jlog_ctx *ctx) {
1189   int rv;
1190   struct stat sb;
1191   int dirmode;
1192
1193   ctx->last_error = JLOG_ERR_SUCCESS;
1194   if(strlen(ctx->path) > MAXLOGPATHLEN-1) {
1195     ctx->last_error = JLOG_ERR_CREATE_PATHLEN;
1196     return -1;
1197   }
1198   if(ctx->context_mode != JLOG_NEW) {
1199     ctx->last_error = JLOG_ERR_ILLEGAL_INIT;
1200     return -1;
1201   }
1202   ctx->context_mode = JLOG_INIT;
1203   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1204   if(rv == 0 || errno != ENOENT) {
1205     SYS_FAIL_EX(JLOG_ERR_CREATE_EXISTS, 0);
1206   }
1207   dirmode = ctx->file_mode;
1208   if(dirmode & 0400) dirmode |= 0100;
1209   if(dirmode & 040) dirmode |= 010;
1210   if(dirmode & 04) dirmode |= 01;
1211   if(mkdir(ctx->path, dirmode) == -1)
1212     SYS_FAIL(JLOG_ERR_CREATE_MKDIR);
1213   chmod(ctx->path, dirmode);
1214   /* Setup our initial state and store our instance metadata */
1215   if(__jlog_open_metastore(ctx) != 0)
1216     SYS_FAIL(JLOG_ERR_CREATE_META);
1217   if(__jlog_save_metastore(ctx, 0) != 0)
1218     SYS_FAIL(JLOG_ERR_CREATE_META);
1219  finish:
1220   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1221   return -1;
1222 }
1223 int jlog_ctx_close(jlog_ctx *ctx) {
1224   __jlog_close_writer(ctx);
1225   __jlog_close_indexer(ctx);
1226   __jlog_close_reader(ctx);
1227   __jlog_close_metastore(ctx);
1228   __jlog_close_checkpoint(ctx);
1229   if(ctx->subscriber_name) free(ctx->subscriber_name);
1230   if(ctx->path) free(ctx->path);
1231   free(ctx);
1232   return 0;
1233 }
1234
1235 static int __jlog_metastore_atomic_increment(jlog_ctx *ctx) {
1236   char file[MAXPATHLEN];
1237 #ifdef DEBUG
1238   fprintf(stderr, "atomic increment on %u\n", ctx->current_log);
1239 #endif
1240   if(ctx->data) SYS_FAIL(JLOG_ERR_NOT_SUPPORTED);
1241   if (!jlog_file_lock(ctx->metastore))
1242     SYS_FAIL(JLOG_ERR_LOCK);
1243   if(__jlog_restore_metastore(ctx, 1))
1244     SYS_FAIL(JLOG_ERR_META_OPEN);
1245   if(ctx->meta->storage_log == ctx->current_log) {
1246     /* We're the first ones to it, so we get to increment it */
1247     ctx->current_log++;
1248     STRSETDATAFILE(ctx, file, ctx->current_log);
1249     ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode);
1250     ctx->meta->storage_log = ctx->current_log;
1251     if(__jlog_save_metastore(ctx, 1))
1252       SYS_FAIL(JLOG_ERR_META_OPEN);
1253   }
1254  finish:
1255   jlog_file_unlock(ctx->metastore);
1256   /* Now we update our curent_log to the current storage_log,
1257    * it may have advanced farther than we know.
1258    */
1259   ctx->current_log = ctx->meta->storage_log;
1260   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1261   return -1;
1262 }
1263 int jlog_ctx_write_message(jlog_ctx *ctx, jlog_message *mess, struct timeval *when) {
1264   struct timeval now;
1265   jlog_message_header hdr;
1266   off_t current_offset;
1267
1268   ctx->last_error = JLOG_ERR_SUCCESS;
1269   if(ctx->context_mode != JLOG_APPEND) {
1270     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1271     ctx->last_errno = EPERM;
1272     return -1;
1273   }
1274  begin:
1275   __jlog_open_writer(ctx);
1276   if(!ctx->data) {
1277     ctx->last_error = JLOG_ERR_FILE_OPEN;
1278     ctx->last_errno = errno;
1279     return -1;
1280   }
1281   if (!jlog_file_lock(ctx->data)) {
1282     ctx->last_error = JLOG_ERR_LOCK;
1283     ctx->last_errno = errno;
1284     return -1;
1285   }
1286
1287   if ((current_offset = jlog_file_size(ctx->data)) == -1)
1288     SYS_FAIL(JLOG_ERR_FILE_SEEK);
1289   if(ctx->meta->unit_limit <= current_offset) {
1290     jlog_file_unlock(ctx->data);
1291     __jlog_close_writer(ctx);
1292     __jlog_metastore_atomic_increment(ctx);
1293     goto begin;
1294   }
1295
1296   hdr.reserved = ctx->meta->hdr_magic;
1297   if (when) {
1298     hdr.tv_sec = when->tv_sec;
1299     hdr.tv_usec = when->tv_usec;
1300   } else {
1301     gettimeofday(&now, NULL);
1302     hdr.tv_sec = now.tv_sec;
1303     hdr.tv_usec = now.tv_usec;
1304   }
1305   hdr.mlen = mess->mess_len;
1306   if (!jlog_file_pwrite(ctx->data, &hdr, sizeof(hdr), current_offset))
1307     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1308   current_offset += sizeof(hdr);
1309   if (!jlog_file_pwrite(ctx->data, mess->mess, mess->mess_len, current_offset))
1310     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1311   current_offset += mess->mess_len;
1312
1313   if(ctx->meta->unit_limit <= current_offset) {
1314     jlog_file_unlock(ctx->data);
1315     __jlog_close_writer(ctx);
1316     __jlog_metastore_atomic_increment(ctx);
1317     return 0;
1318   }
1319  finish:
1320   jlog_file_unlock(ctx->data);
1321   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1322   return -1;
1323 }
1324 int jlog_ctx_read_checkpoint(jlog_ctx *ctx, const jlog_id *chkpt) {
1325   ctx->last_error = JLOG_ERR_SUCCESS;
1326  
1327   if(ctx->context_mode != JLOG_READ) {
1328     ctx->last_error = JLOG_ERR_ILLEGAL_CHECKPOINT;
1329     ctx->last_errno = EPERM;
1330     return -1;
1331   }
1332   if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, chkpt) != 0) {
1333     ctx->last_error = JLOG_ERR_CHECKPOINT;
1334     ctx->last_errno = 0;
1335     return -1;
1336   }
1337   return 0;
1338 }
1339
1340 int jlog_ctx_remove_subscriber(jlog_ctx *ctx, const char *s) {
1341   char name[MAXPATHLEN];
1342   int rv;
1343
1344   compute_checkpoint_filename(ctx, s, name);
1345   rv = unlink(name);
1346
1347   if (rv == 0) {
1348     ctx->last_error = JLOG_ERR_SUCCESS;
1349     return 1;
1350   }
1351   if (errno == ENOENT) {
1352     ctx->last_error = JLOG_ERR_INVALID_SUBSCRIBER;
1353     return 0;
1354   }
1355   return -1;
1356 }
1357
1358 int jlog_ctx_add_subscriber(jlog_ctx *ctx, const char *s, jlog_position whence) {
1359   jlog_id chkpt;
1360   jlog_ctx *tmpctx = NULL;
1361   jlog_file *jchkpt;
1362   ctx->last_error = JLOG_ERR_SUCCESS;
1363
1364   jchkpt = __jlog_open_named_checkpoint(ctx, s, O_CREAT|O_EXCL);
1365   if(!jchkpt) {
1366     ctx->last_errno = errno;
1367     if(errno == EEXIST)
1368       ctx->last_error = JLOG_ERR_SUBSCRIBER_EXISTS;
1369     else
1370       ctx->last_error = JLOG_ERR_OPEN;
1371     return -1;
1372   }
1373   jlog_file_close(jchkpt);
1374  
1375   if(whence == JLOG_BEGIN) {
1376     memset(&chkpt, 0, sizeof(chkpt));
1377     jlog_ctx_first_log_id(ctx, &chkpt);
1378     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0) {
1379       ctx->last_error = JLOG_ERR_CHECKPOINT;
1380       ctx->last_errno = 0;
1381       return -1;
1382     }
1383     return 0;
1384   }
1385   if(whence == JLOG_END) {
1386     jlog_id start, finish;
1387     memset(&chkpt, 0, sizeof(chkpt));
1388     if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1389     if(__jlog_restore_metastore(ctx, 0))
1390       SYS_FAIL(JLOG_ERR_META_OPEN);
1391     chkpt.log = ctx->meta->storage_log;
1392     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0)
1393       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1394     tmpctx = jlog_new(ctx->path);
1395     if(jlog_ctx_open_reader(tmpctx, s) < 0) goto finish;
1396     if(jlog_ctx_read_interval(tmpctx, &start, &finish) < 0) goto finish;
1397     jlog_ctx_close(tmpctx);
1398     tmpctx = NULL;
1399     if(__jlog_set_checkpoint(ctx, s, &finish) != 0)
1400       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1401     return 0;
1402   }
1403   ctx->last_error = JLOG_ERR_NOT_SUPPORTED;
1404  finish:
1405   if(tmpctx) jlog_ctx_close(tmpctx);
1406   return -1;
1407 }
1408
1409 int jlog_ctx_write(jlog_ctx *ctx, const void *data, size_t len) {
1410   jlog_message m;
1411   m.mess = (void *)data;
1412   m.mess_len = len;
1413   return jlog_ctx_write_message(ctx, &m, NULL);
1414 }
1415
1416 static int __jlog_find_first_log_after(jlog_ctx *ctx, jlog_id *chkpt,
1417                                 jlog_id *start, jlog_id *finish) {
1418   jlog_id last;
1419   int closed;
1420
1421   memcpy(start, chkpt, sizeof(*chkpt));
1422  attempt:
1423   if(__jlog_resync_index(ctx, start->log, &last, &closed) != 0) {
1424     if(ctx->last_error == JLOG_ERR_FILE_OPEN &&
1425         ctx->last_errno == ENOENT) {
1426       char file[MAXPATHLEN];
1427       int ferr, len;
1428       struct stat sb = {0};
1429
1430       STRSETDATAFILE(ctx, file, start->log + 1);
1431       while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1432       /* That file doesn't exist... bad, but we can fake a recovery by
1433          advancing the next file that does exist */
1434       ctx->last_error = JLOG_ERR_SUCCESS;
1435       if(start->log >= ctx->meta->storage_log || (ferr != 0 && errno != ENOENT)) {
1436         /* We don't advance past where people are writing */
1437         memcpy(finish, start, sizeof(*start));
1438         return 0;
1439       }
1440       if(__jlog_resync_index(ctx, start->log + 1, &last, &closed) != 0) {
1441         /* We don't advance past where people are writing */
1442         memcpy(finish, start, sizeof(*start));
1443         return 0;
1444       }
1445       len = strlen(file);
1446       if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
1447       memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
1448       while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1449       if(ferr != 0 || sb.st_size == 0) {
1450         /* We don't advance past where people are writing */
1451         memcpy(finish, start, sizeof(*start));
1452         return 0;
1453       }
1454       start->marker = 0;
1455       start->log++;  /* BE SMARTER! */
1456       goto attempt;
1457     }
1458     return -1; /* Just persist resync's error state */
1459   }
1460
1461   /* If someone checkpoints off the end, be nice */
1462   if(last.log == start->log && last.marker < start->marker)
1463     memcpy(start, &last, sizeof(*start));
1464
1465   if(!memcmp(start, &last, sizeof(last)) && closed) {
1466     char file[MAXPATHLEN];
1467     int ferr, len;
1468     struct stat sb = {0};
1469
1470     STRSETDATAFILE(ctx, file, start->log + 1);
1471     while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1472     if(ferr) {
1473       fprintf(stderr, "stat(%s) error: %s\n", file, strerror(errno));
1474       if(start->log < ctx->meta->storage_log - 1) {
1475         start->marker = 0;
1476         start->log += 2;
1477         memcpy(finish, start, sizeof(*start));
1478         return 0;
1479       }
1480     }
1481     if(start->log >= ctx->meta->storage_log || ferr != 0 || sb.st_size == 0) {
1482       /* We don't advance past where people are writing */
1483       memcpy(finish, start, sizeof(*start));
1484       return 0;
1485     }
1486     if(__jlog_resync_index(ctx, start->log + 1, &last, &closed) != 0) {
1487       /* We don't advance past where people are writing */
1488       memcpy(finish, start, sizeof(*start));
1489       return 0;
1490     }
1491     len = strlen(file);
1492     if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
1493     memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
1494     while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1495     if(ferr != 0 || sb.st_size == 0) {
1496       /* We don't advance past where people are writing */
1497       memcpy(finish, start, sizeof(*start));
1498       return 0;
1499     }
1500     start->marker = 0;
1501     start->log++;
1502     goto attempt;
1503   }
1504   memcpy(finish, &last, sizeof(last));
1505   return 0;
1506 }
1507 int jlog_ctx_read_message(jlog_ctx *ctx, const jlog_id *id, jlog_message *m) {
1508   off_t index_len;
1509   u_int64_t data_off;
1510   int with_lock = 0;
1511
1512  once_more_with_lock:
1513
1514   ctx->last_error = JLOG_ERR_SUCCESS;
1515   if (ctx->context_mode != JLOG_READ)
1516     SYS_FAIL(JLOG_ERR_ILLEGAL_WRITE);
1517   if (id->marker < 1) {
1518     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1519   }
1520
1521   __jlog_open_reader(ctx, id->log);
1522   if(!ctx->data)
1523     SYS_FAIL(JLOG_ERR_FILE_OPEN);
1524   __jlog_open_indexer(ctx, id->log);
1525   if(!ctx->index)
1526     SYS_FAIL(JLOG_ERR_IDX_OPEN);
1527
1528   if(with_lock) {
1529     if (!jlog_file_lock(ctx->index)) {
1530       with_lock = 0;
1531       SYS_FAIL(JLOG_ERR_LOCK);
1532     }
1533   }
1534
1535   if ((index_len = jlog_file_size(ctx->index)) == -1)
1536     SYS_FAIL(JLOG_ERR_IDX_SEEK);
1537   if (index_len % sizeof(u_int64_t))
1538     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1539   if (id->marker * sizeof(u_int64_t) > index_len) {
1540     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1541   }
1542
1543   if (!jlog_file_pread(ctx->index, &data_off, sizeof(u_int64_t),
1544                        (id->marker - 1) * sizeof(u_int64_t)))
1545   {
1546     SYS_FAIL(JLOG_ERR_IDX_READ);
1547   }
1548   if (data_off == 0 && id->marker != 1) {
1549     if (id->marker * sizeof(u_int64_t) == index_len) {
1550       /* close tag; not a real offset */
1551       ctx->last_error = JLOG_ERR_CLOSE_LOGID;
1552       ctx->last_errno = 0;
1553       if(with_lock) jlog_file_unlock(ctx->index);
1554       return -1;
1555     } else {
1556       /* an offset of 0 in the middle of an index means curruption */
1557       SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1558     }
1559   }
1560
1561   if(__jlog_mmap_reader(ctx, id->log) != 0)
1562     SYS_FAIL(JLOG_ERR_FILE_READ);
1563
1564   if(data_off > ctx->mmap_len - sizeof(jlog_message_header)) {
1565 #ifdef DEBUG
1566     fprintf(stderr, "read idx off end: %llu\n", data_off);
1567 #endif
1568     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1569   }
1570
1571   memcpy(&m->aligned_header, ((u_int8_t *)ctx->mmap_base) + data_off,
1572          sizeof(jlog_message_header));
1573
1574   if(data_off + sizeof(jlog_message_header) + m->aligned_header.mlen > ctx->mmap_len) {
1575 #ifdef DEBUG
1576     fprintf(stderr, "read idx off end: %llu %llu\n", data_off, ctx->mmap_len);
1577 #endif
1578     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1579   }
1580
1581   m->header = &m->aligned_header;
1582   m->mess_len = m->header->mlen;
1583   m->mess = (((u_int8_t *)ctx->mmap_base) + data_off + sizeof(jlog_message_header));
1584
1585  finish:
1586   if(with_lock) jlog_file_unlock(ctx->index);
1587   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1588   if(!with_lock) {
1589     if (ctx->last_error == JLOG_ERR_IDX_CORRUPT) {
1590       if (jlog_file_lock(ctx->index)) {
1591         jlog_file_truncate(ctx->index, 0);
1592         jlog_file_unlock(ctx->index);
1593       }
1594     }
1595     ___jlog_resync_index(ctx, id->log, NULL, NULL);
1596     with_lock = 1;
1597 #ifdef DEBUG
1598     fprintf(stderr, "read retrying with lock\n");
1599 #endif
1600     goto once_more_with_lock;
1601   }
1602   return -1;
1603 }
1604 int jlog_ctx_read_interval(jlog_ctx *ctx, jlog_id *start, jlog_id *finish) {
1605   jlog_id chkpt;
1606   int count = 0;
1607
1608   ctx->last_error = JLOG_ERR_SUCCESS;
1609   if(ctx->context_mode != JLOG_READ) {
1610     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1611     ctx->last_errno = EPERM;
1612     return -1;
1613   }
1614
1615   __jlog_restore_metastore(ctx, 0);
1616   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &chkpt))
1617     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1618   if(__jlog_find_first_log_after(ctx, &chkpt, start, finish) != 0)
1619     goto finish; /* Leave whatever error was set in find_first_log_after */
1620   if(start->log != chkpt.log) start->marker = 0;
1621   else start->marker = chkpt.marker;
1622   if(start->log != chkpt.log) {
1623     /* We've advanced our checkpoint, let's not do this work again */
1624     if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, start) != 0)
1625       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1626   }
1627   /* Here 'start' is actually the checkpoint, so we must advance it one.
1628      However, that may not be possible, if there are no messages, so first
1629      make sure finish is bigger */
1630   count = finish->marker - start->marker;
1631   if(finish->marker > start->marker) start->marker++;
1632
1633   /* We need to munmap it, so that we can remap it with more data if needed */
1634   __jlog_munmap_reader(ctx);
1635  finish:
1636   if(ctx->last_error == JLOG_ERR_SUCCESS) return count;
1637   return -1;
1638 }
1639
1640 int jlog_ctx_first_log_id(jlog_ctx *ctx, jlog_id *id) {
1641   DIR *d;
1642   struct dirent *de;
1643   ctx->last_error = JLOG_ERR_SUCCESS;
1644   u_int32_t log;
1645   int found = 0;
1646
1647   id->log = 0xffffffff;
1648   id->marker = 0;
1649   d = opendir(ctx->path);
1650   if (!d) return -1;
1651
1652   while ((de = readdir(d))) {
1653     int i;
1654     char *cp = de->d_name;
1655     if(strlen(cp) != 8) continue;
1656     log = 0;
1657     for(i=0;i<8;i++) {
1658       log <<= 4;
1659       if(cp[i] >= '0' && cp[i] <= '9') log |= (cp[i] - '0');
1660       else if(cp[i] >= 'a' && cp[i] <= 'f') log |= (cp[i] - 'a' + 0xa);
1661       else if(cp[i] >= 'A' && cp[i] <= 'F') log |= (cp[i] - 'A' + 0xa);
1662       else break;
1663     }
1664     if(i != 8) continue;
1665     found = 1;
1666     if(log < id->log) id->log = log;
1667   }
1668   if(!found) id->log = 0;
1669   closedir(d);
1670   return 0;
1671 }
1672
1673 int jlog_ctx_last_log_id(jlog_ctx *ctx, jlog_id *id) {
1674   ctx->last_error = JLOG_ERR_SUCCESS;
1675   if(ctx->context_mode != JLOG_READ) {
1676     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1677     ctx->last_errno = EPERM;
1678     return -1;
1679   }
1680   if (__jlog_restore_metastore(ctx, 0) != 0) return -1;
1681   ___jlog_resync_index(ctx, ctx->meta->storage_log, id, NULL);
1682   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1683   return -1;
1684 }
1685
1686 int jlog_ctx_advance_id(jlog_ctx *ctx, jlog_id *cur,
1687                         jlog_id *start, jlog_id *finish)
1688 {
1689   int rv;
1690   if(memcmp(cur, finish, sizeof(jlog_id))) {
1691     start->marker++;
1692   } else {
1693     if((rv = __jlog_find_first_log_after(ctx, cur, start, finish)) != 0) {
1694       return rv;
1695     }
1696     if(cur->log != start->log) {
1697       start->marker = 1;
1698     }
1699     else start->marker = cur->marker;
1700   }
1701   return 0;
1702 }
1703
1704 static int is_datafile(const char *f, u_int32_t *logid) {
1705   int i;
1706   u_int32_t l = 0;
1707   for(i=0; i<8; i++) {
1708     if((f[i] >= '0' && f[i] <= '9') ||
1709        (f[i] >= 'a' && f[i] <= 'f')) {
1710       l <<= 4;
1711       l |= (f[i] < 'a') ? (f[i] - '0') : (f[i] - 'a' + 10);
1712     }
1713     else
1714       return 0;
1715   }
1716   if(f[i] != '\0') return 0;
1717   if(logid) *logid = l;
1718   return 1;
1719 }
1720
1721 int jlog_clean(const char *file) {
1722   int rv = -1;
1723   u_int32_t earliest = 0;
1724   jlog_ctx *log;
1725   DIR *dir;
1726   struct dirent *de;
1727
1728   log = jlog_new(file);
1729   jlog_ctx_open_writer(log);
1730   dir = opendir(file);
1731   if(!dir) goto out;
1732
1733   earliest = 0;
1734   if(jlog_pending_readers(log, 0, &earliest) < 0) goto out;
1735
1736   rv = 0;
1737   while((de = readdir(dir)) != NULL) {
1738     u_int32_t logid;
1739     if(is_datafile(de->d_name, &logid) && logid < earliest) {
1740       char fullfile[MAXPATHLEN];
1741       char fullidx[MAXPATHLEN];
1742       snprintf(fullfile, sizeof(fullfile), "%s/%s", file, de->d_name);
1743       snprintf(fullidx, sizeof(fullidx), "%s/%s" INDEX_EXT, file, de->d_name);
1744       (void)unlink(fullfile);
1745       (void)unlink(fullidx); /* this may not exist; don't care */
1746       rv++;
1747     }
1748   }
1749   closedir(dir);
1750  out:
1751   jlog_ctx_close(log);
1752   return rv;
1753 }
1754
1755 /* vim:se ts=2 sw=2 et: */
Note: See TracBrowser for help on using the browser.