root/jlog.c

Revision cdca96b45c192b3e48c048778e763efbf96a7595, 49.6 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 4 months ago)

Add API Call To Create New Subscriber With Copied Checkpoint

Added an API call that will create a new subscriber and copy the
checkpoint over from an existing subscriber.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2008, Message Systems, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name Message Systems, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 /*****************************************************************
34
35   Journaled logging... append only.
36
37       (1) find current file, or allocate a file, extendible and mark
38           it current.
39  
40       (2) Write records to it, records include their size, so
41           a simple inspection can detect and incomplete trailing
42           record.
43    
44       (3) Write append until the file reaches a certain size.
45
46       (4) Allocate a file, extensible.
47
48       (5) RESYNC INDEX on 'finished' file (see reading:3) and postpend
49           an offset '0' to the index.
50    
51       (2) goto (1)
52    
53   Reading journals...
54
55       (1) find oldest checkpoint of all subscribers, remove all older files.
56
57       (2) (file, last_read) = find_checkpoint for this subscriber
58
59       (3) RESYNC INDEX:
60           open record index for file, seek to the end -  off_t.
61           this is the offset of the last noticed record in this file.
62           open file, seek to this point, roll forward writing the index file
63           _do not_ write an offset for the last record unless it is found
64           complete.
65
66       (4) read entries from last_read+1 -> index of record index
67
68 */
69 #include <stdio.h>
70
71 #include "jlog_config.h"
72 #include "jlog_private.h"
73 #if HAVE_UNISTD_H
74 #include <unistd.h>
75 #endif
76 #if HAVE_SYS_TIME_H
77 #include <sys/time.h>
78 #endif
79 #if HAVE_DIRENT_H
80 #include <dirent.h>
81 #endif
82 #if HAVE_FCNTL_H
83 #include <fcntl.h>
84 #endif
85 #if HAVE_ERRNO_H
86 #include <errno.h>
87 #endif
88 #if HAVE_TIME_H
89 #include <time.h>
90 #endif
91 #if HAVE_SYS_MMAN_H
92 #include <sys/mman.h>
93 #endif
94
95 #define BUFFERED_INDICES 1024
96
97 static jlog_file *__jlog_open_writer(jlog_ctx *ctx);
98 static int __jlog_close_writer(jlog_ctx *ctx);
99 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log);
100 static int __jlog_close_reader(jlog_ctx *ctx);
101 static int __jlog_close_checkpoint(jlog_ctx *ctx);
102 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log);
103 static int __jlog_close_indexer(jlog_ctx *ctx);
104 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *c);
105 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags);
106 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log);
107 static int __jlog_munmap_reader(jlog_ctx *ctx);
108
109 int jlog_snprint_logid(char *b, int n, const jlog_id *id) {
110   return snprintf(b, n, "%08x:%08x", id->log, id->marker);
111 }
112
113 int jlog_repair_datafile(jlog_ctx *ctx, u_int32_t log)
114 {
115   jlog_message_header hdr;
116   char *this, *next, *afternext = NULL, *mmap_end;
117   int i, invalid_count = 0;
118   struct {
119     off_t start, end;
120   } *invalid = NULL;
121   off_t orig_len, src, dst, len;
122
123 #define TAG_INVALID(s, e) do { \
124   if (invalid_count) \
125     invalid = realloc(invalid, (invalid_count + 1) * sizeof(*invalid)); \
126   else \
127     invalid = malloc(sizeof(*invalid)); \
128   invalid[invalid_count].start = s - (char *)ctx->mmap_base; \
129   invalid[invalid_count].end = e - (char *)ctx->mmap_base; \
130   invalid_count++; \
131 } while (0)
132
133   ctx->last_error = JLOG_ERR_SUCCESS;
134
135   /* we want the reader's open logic because this runs in the read path
136    * the underlying fds are always RDWR anyway */
137   __jlog_open_reader(ctx, log);
138   if (!ctx->data) {
139     ctx->last_error = JLOG_ERR_FILE_OPEN;
140     ctx->last_errno = errno;
141     return -1;
142   }
143   if (!jlog_file_lock(ctx->data)) {
144     ctx->last_error = JLOG_ERR_LOCK;
145     ctx->last_errno = errno;
146     return -1;
147   }
148   if (__jlog_mmap_reader(ctx, log) != 0)
149     SYS_FAIL(JLOG_ERR_FILE_READ);
150
151   orig_len = ctx->mmap_len;
152   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
153   /* these values will cause us to fall right into the error clause and
154    * start searching for a valid header from offset 0 */
155   this = (char*)ctx->mmap_base - sizeof(hdr);
156   hdr.reserved = ctx->meta->hdr_magic;
157   hdr.mlen = 0;
158
159   while (this + sizeof(hdr) <= mmap_end) {
160     next = this + sizeof(hdr) + hdr.mlen;
161     if (next <= (char *)ctx->mmap_base) goto error;
162     if (next == mmap_end) {
163       this = next;
164       break;
165     }
166     if (next + sizeof(hdr) > mmap_end) goto error;
167     memcpy(&hdr, next, sizeof(hdr));
168     if (hdr.reserved != ctx->meta->hdr_magic) goto error;
169     this = next;
170     continue;
171   error:
172     for (next = this + sizeof(hdr); next + sizeof(hdr) <= mmap_end; next++) {
173       memcpy(&hdr, next, sizeof(hdr));
174       if (hdr.reserved == ctx->meta->hdr_magic) {
175         afternext = next + sizeof(hdr) + hdr.mlen;
176         if (afternext <= (char *)ctx->mmap_base) continue;
177         if (afternext == mmap_end) break;
178         if (afternext + sizeof(hdr) > mmap_end) continue;
179         memcpy(&hdr, afternext, sizeof(hdr));
180         if (hdr.reserved == ctx->meta->hdr_magic) break;
181       }
182     }
183     /* correct for while loop entry condition */
184     if (this < (char *)ctx->mmap_base) this = ctx->mmap_base;
185     if (next + sizeof(hdr) > mmap_end) break;
186     if (next > this) TAG_INVALID(this, next);
187     this = afternext;
188   }
189   if (this != mmap_end) TAG_INVALID(this, mmap_end);
190
191 #undef TAG_INVALID
192
193 #define MOVE_SEGMENT do { \
194   char cpbuff[4096]; \
195   off_t chunk; \
196   while(len > 0) { \
197     chunk = len; \
198     if (chunk > sizeof(cpbuff)) chunk = sizeof(cpbuff); \
199     if (!jlog_file_pread(ctx->data, &cpbuff, chunk, src)) \
200       SYS_FAIL(JLOG_ERR_FILE_READ); \
201     if (!jlog_file_pwrite(ctx->data, &cpbuff, chunk, dst)) \
202       SYS_FAIL(JLOG_ERR_FILE_WRITE); \
203     src += chunk; \
204     dst += chunk; \
205     len -= chunk; \
206   } \
207 } while (0)
208
209   if (invalid_count > 0) {
210     __jlog_munmap_reader(ctx);
211     dst = invalid[0].start;
212     for (i = 0; i < invalid_count - 1; ) {
213       src = invalid[i].end;
214       len = invalid[++i].start - src;
215       MOVE_SEGMENT;
216     }
217     src = invalid[invalid_count - 1].end;
218     len = orig_len - src;
219     if (len > 0) MOVE_SEGMENT;
220     if (!jlog_file_truncate(ctx->data, dst))
221       SYS_FAIL(JLOG_ERR_FILE_WRITE);
222   }
223
224 #undef MOVE_SEGMENT
225
226 finish:
227   jlog_file_unlock(ctx->data);
228   if (invalid) free(invalid);
229   if (ctx->last_error != JLOG_ERR_SUCCESS) return -1;
230   return invalid_count;
231 }
232
233 int jlog_inspect_datafile(jlog_ctx *ctx, u_int32_t log, int verbose)
234 {
235   jlog_message_header hdr;
236   char *this, *next, *mmap_end;
237   int i;
238   time_t timet;
239   struct tm tm;
240   char tbuff[128];
241
242   ctx->last_error = JLOG_ERR_SUCCESS;
243
244   __jlog_open_reader(ctx, log);
245   if (!ctx->data)
246     SYS_FAIL(JLOG_ERR_FILE_OPEN);
247   if (__jlog_mmap_reader(ctx, log) != 0)
248     SYS_FAIL(JLOG_ERR_FILE_READ);
249
250   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
251   this = ctx->mmap_base;
252   i = 0;
253   while (this + sizeof(hdr) <= mmap_end) {
254     int initial = 1;
255     memcpy(&hdr, this, sizeof(hdr));
256     i++;
257     if (hdr.reserved != ctx->meta->hdr_magic) {
258       fprintf(stderr, "Message %d at [%ld] has invalid reserved value %u\n",
259               i, (long int)(this - (char *)ctx->mmap_base), hdr.reserved);
260       return 1;
261     }
262
263 #define PRINTMSGHDR do { if(initial) { \
264   fprintf(stderr, "Message %d at [%ld] of (%lu+%u)", i, \
265           (long int)(this - (char *)ctx->mmap_base), \
266           (long unsigned int)sizeof(hdr), hdr.mlen); \
267   initial = 0; \
268 } } while(0)
269
270     if(verbose) {
271       PRINTMSGHDR;
272     }
273
274     next = this + sizeof(hdr) + hdr.mlen;
275     if (next <= (char *)ctx->mmap_base) {
276       PRINTMSGHDR;
277       fprintf(stderr, " WRAPPED TO NEGATIVE OFFSET!\n");
278       return 1;
279     }
280     if (next > mmap_end) {
281       PRINTMSGHDR;
282       fprintf(stderr, " OFF THE END!\n");
283       return 1;
284     }
285
286     timet = hdr.tv_sec;
287     localtime_r(&timet, &tm);
288     strftime(tbuff, sizeof(tbuff), "%c", &tm);
289     if(verbose) fprintf(stderr, "\n\ttime: %s\n\tmlen: %u\n", tbuff, hdr.mlen);
290     this = next;
291   }
292   if (this < mmap_end) {
293     fprintf(stderr, "%ld bytes of junk at the end\n",
294             (long int)(mmap_end - this));
295     return 1;
296   }
297
298   return 0;
299 finish:
300   return -1;
301 }
302
303 int jlog_idx_details(jlog_ctx *ctx, u_int32_t log,
304                      u_int32_t *marker, int *closed)
305 {
306   off_t index_len;
307   u_int64_t index;
308
309   __jlog_open_indexer(ctx, log);
310   if (!ctx->index)
311     SYS_FAIL(JLOG_ERR_IDX_OPEN);
312   if ((index_len = jlog_file_size(ctx->index)) == -1)
313     SYS_FAIL(JLOG_ERR_IDX_SEEK);
314   if (index_len % sizeof(u_int64_t))
315     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
316   if (index_len > sizeof(u_int64_t)) {
317     if (!jlog_file_pread(ctx->index, &index, sizeof(u_int64_t),
318                          index_len - sizeof(u_int64_t)))
319     {
320       SYS_FAIL(JLOG_ERR_IDX_READ);
321     }
322     if (index) {
323       *marker = index_len / sizeof(u_int64_t);
324       *closed = 0;
325     } else {
326       *marker = (index_len / sizeof(u_int64_t)) - 1;
327       *closed = 1;
328     }
329   } else {
330     *marker = index_len / sizeof(u_int64_t);
331     *closed = 0;
332   }
333
334   return 0;
335 finish:
336   return -1;
337 }
338
339 static int __jlog_unlink_datafile(jlog_ctx *ctx, u_int32_t log) {
340   char file[MAXPATHLEN];
341   int len;
342
343   if(ctx->current_log == log) {
344     __jlog_close_reader(ctx);
345     __jlog_close_indexer(ctx);
346   }
347
348   STRSETDATAFILE(ctx, file, log);
349 #ifdef DEBUG
350   fprintf(stderr, "unlinking %s\n", file);
351 #endif
352   unlink(file);
353
354   len = strlen(file);
355   if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
356   memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
357 #ifdef DEBUG
358   fprintf(stderr, "unlinking %s\n", file);
359 #endif
360   unlink(file);
361   return 0;
362 }
363
364 static int __jlog_open_metastore(jlog_ctx *ctx)
365 {
366   char file[MAXPATHLEN];
367   int len;
368
369 #ifdef DEBUG
370   fprintf(stderr, "__jlog_open_metastore\n");
371 #endif
372   len = strlen(ctx->path);
373   if((len + 1 /* IFS_CH */ + 9 /* "metastore" */ + 1) > MAXPATHLEN) {
374 #ifdef ENAMETOOLONG
375     ctx->last_errno = ENAMETOOLONG;
376 #endif
377     ctx->last_error = JLOG_ERR_CREATE_META;
378     return -1;
379   }
380   memcpy(file, ctx->path, len);
381   file[len++] = IFS_CH;
382   memcpy(&file[len], "metastore", 10); /* "metastore" + '\0' */
383
384   ctx->metastore = jlog_file_open(file, O_CREAT, ctx->file_mode);
385
386   if (!ctx->metastore) {
387     ctx->last_errno = errno;
388     ctx->last_error = JLOG_ERR_CREATE_META;
389     return -1;
390   }
391
392   return 0;
393 }
394
395 /* exported */
396 int __jlog_pending_readers(jlog_ctx *ctx, u_int32_t log) {
397   return jlog_pending_readers(ctx, log, NULL);
398 }
399 int jlog_pending_readers(jlog_ctx *ctx, u_int32_t log,
400                          u_int32_t *earliest_out) {
401   int readers;
402   DIR *dir;
403   struct dirent *ent;
404   char file[MAXPATHLEN];
405   int len, seen = 0;
406   u_int32_t earliest = 0;
407   jlog_id id;
408
409   readers = 0;
410
411   dir = opendir(ctx->path);
412   if (!dir) return -1;
413
414   len = strlen(ctx->path);
415   if(len + 2 > sizeof(file)) {
416     closedir(dir);
417     return -1;
418   }
419   memcpy(file, ctx->path, len);
420   file[len++] = IFS_CH;
421   file[len] = '\0';
422
423   while ((ent = readdir(dir))) {
424     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
425       jlog_file *cp;
426       int dlen;
427
428       dlen = strlen(ent->d_name);
429       if((len + dlen + 1) > sizeof(file)) continue;
430       memcpy(file + len, ent->d_name, dlen + 1); /* include \0 */
431 #ifdef DEBUG
432       fprintf(stderr, "Checking if %s needs %s...\n", ent->d_name, ctx->path);
433 #endif
434       if ((cp = jlog_file_open(file, 0, ctx->file_mode))) {
435         if (jlog_file_lock(cp)) {
436           (void) jlog_file_pread(cp, &id, sizeof(id), 0);
437 #ifdef DEBUG
438           fprintf(stderr, "\t%u <= %u (pending reader)\n", id.log, log);
439 #endif
440           if (!seen) {
441             earliest = id.log;
442             seen = 1;
443           }
444           else {
445             if(id.log < earliest) {
446               earliest = id.log;
447             }
448           }
449           if (id.log <= log) {
450             readers++;
451           }
452           jlog_file_unlock(cp);
453         }
454         jlog_file_close(cp);
455       }
456     }
457   }
458   closedir(dir);
459   if(earliest_out) *earliest_out = earliest;
460   return readers;
461 }
462 struct _jlog_subs {
463   char **subs;
464   int used;
465   int allocd;
466 };
467
468 int jlog_ctx_list_subscribers_dispose(jlog_ctx *ctx, char **subs) {
469   char *s;
470   int i = 0;
471   if(subs) {
472     while((s = subs[i++]) != NULL) free(s);
473     free(subs);
474   }
475   return 0;
476 }
477
478 int jlog_ctx_list_subscribers(jlog_ctx *ctx, char ***subs) {
479   struct _jlog_subs js = { NULL, 0, 0 };
480   DIR *dir;
481   struct dirent *ent;
482   unsigned char file[MAXPATHLEN];
483   char *p;
484   int len;
485
486   js.subs = calloc(16, sizeof(char *));
487   js.allocd = 16;
488
489   dir = opendir(ctx->path);
490   if (!dir) return -1;
491   while ((ent = readdir(dir))) {
492     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
493
494       for (len = 0, p = ent->d_name + 3; *p;) {
495         unsigned char c;
496         int i;
497
498         for (c = 0, i = 0; i < 16; i++) {
499           if (__jlog_hexchars[i] == *p) {
500             c = i << 4;
501             break;
502           }
503         }
504         p++;
505         for (i = 0; i < 16; i++) {
506           if (__jlog_hexchars[i] == *p) {
507             c |= i;
508             break;
509           }
510         }
511         p++;
512         file[len++] = c;
513       }
514       file[len] = '\0';
515
516       js.subs[js.used++] = strdup((char *)file);
517       if(js.used == js.allocd) {
518         js.allocd *= 2;
519         js.subs = realloc(js.subs, js.allocd*sizeof(char *));
520       }
521       js.subs[js.used] = NULL;
522     }
523   }
524   closedir(dir);
525   *subs = js.subs;
526   return js.used;
527 }
528
529 static int __jlog_save_metastore(jlog_ctx *ctx, int ilocked)
530 {
531 #ifdef DEBUG
532   fprintf(stderr, "__jlog_save_metastore\n");
533 #endif
534
535   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
536     return -1;
537   }
538
539   if(ctx->meta_is_mapped) {
540     int rv, flags = MS_INVALIDATE;
541     if(ctx->meta->safety == JLOG_SAFE) flags |= MS_SYNC;
542     rv = msync(ctx->meta, sizeof(*ctx->meta), flags);
543     if (!ilocked) jlog_file_unlock(ctx->metastore);
544     return rv;
545   }
546   else {
547     if (!jlog_file_pwrite(ctx->metastore, ctx->meta, sizeof(*ctx->meta), 0)) {
548       if (!ilocked) jlog_file_unlock(ctx->metastore);
549       return -1;
550     }
551     if (ctx->meta->safety == JLOG_SAFE) {
552       jlog_file_sync(ctx->metastore);
553     }
554   }
555
556   if (!ilocked) jlog_file_unlock(ctx->metastore);
557   return 0;
558 }
559
560 static int __jlog_restore_metastore(jlog_ctx *ctx, int ilocked)
561 {
562   void *base = NULL;
563   size_t len = 0;
564   if(ctx->meta_is_mapped) return 0;
565 #ifdef DEBUG
566   fprintf(stderr, "__jlog_restore_metastore\n");
567 #endif
568
569   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
570     return -1;
571   }
572
573   if(ctx->meta_is_mapped == 0) {
574     int rv;
575     rv = jlog_file_map_rdwr(ctx->metastore, &base, &len);
576     if(rv != 1) {
577       if (!ilocked) jlog_file_unlock(ctx->metastore);
578       return -1;
579     }
580     if(len == 12) {
581       /* old metastore format doesn't have the new magic hdr in it
582        * we need to extend it by four bytes, but we know the hdr was
583        * previously 0, so we write out zero.
584        */
585        u_int32_t dummy = 0;
586        jlog_file_pwrite(ctx->metastore, &dummy, sizeof(dummy), 12);
587        rv = jlog_file_map_rdwr(ctx->metastore, &base, &len);
588     }
589     if(rv != 1 || len != sizeof(*ctx->meta)) {
590       if (!ilocked) jlog_file_unlock(ctx->metastore);
591       return -1;
592     }
593     ctx->meta = base;
594     ctx->meta_is_mapped = 1;
595   }
596
597   if (!ilocked) jlog_file_unlock(ctx->metastore);
598
599   if(ctx->meta != &ctx->pre_init)
600     ctx->pre_init.hdr_magic = ctx->meta->hdr_magic;
601   return 0;
602 }
603
604 int jlog_get_checkpoint(jlog_ctx *ctx, const char *s, jlog_id *id) {
605   jlog_file *f;
606   int rv = -1;
607
608   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
609     if(!ctx->checkpoint) {
610       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
611     }
612     f = ctx->checkpoint;
613   } else
614     f = __jlog_open_named_checkpoint(ctx, s, 0);
615
616   if (f) {
617     if (jlog_file_lock(f)) {
618       if (jlog_file_pread(f, id, sizeof(*id), 0)) rv = 0;
619       jlog_file_unlock(f);
620     }
621   }
622   if (f && f != ctx->checkpoint) jlog_file_close(f);
623   return rv;
624 }
625
626 static int __jlog_set_checkpoint(jlog_ctx *ctx, const char *s, const jlog_id *id)
627 {
628   jlog_file *f;
629   int rv = -1;
630   jlog_id old_id;
631   u_int32_t log;
632
633   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
634     if(!ctx->checkpoint) {
635       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
636     }
637     f = ctx->checkpoint;
638   } else
639     f = __jlog_open_named_checkpoint(ctx, s, 0);
640
641   if(!f) return -1;
642   if (!jlog_file_lock(f))
643     goto failset;
644
645   if (jlog_file_size(f) == 0) {
646     /* we're setting it for the first time, no segments were pending on it */
647     old_id.log = id->log;
648   } else {
649     if (!jlog_file_pread(f, &old_id, sizeof(old_id), 0))
650       goto failset;
651   }
652   if (!jlog_file_pwrite(f, id, sizeof(*id), 0))
653     goto failset;
654   if (ctx->meta->safety == JLOG_SAFE) {
655     jlog_file_sync(f);
656   }
657   jlog_file_unlock(f);
658   rv = 0;
659
660   for (log = old_id.log; log < id->log; log++) {
661     if (__jlog_pending_readers(ctx, log) == 0) {
662       __jlog_unlink_datafile(ctx, log);
663     }
664   }
665
666  failset:
667   if (f && f != ctx->checkpoint) jlog_file_close(f);
668   return rv;
669 }
670
671 static int __jlog_close_metastore(jlog_ctx *ctx) {
672   if (ctx->metastore) {
673     jlog_file_close(ctx->metastore);
674     ctx->metastore = NULL;
675   }
676   if (ctx->meta_is_mapped) {
677     munmap((void *)ctx->meta, sizeof(*ctx->meta));
678     ctx->meta = &ctx->pre_init;
679     ctx->meta_is_mapped = 0;
680   }
681   return 0;
682 }
683
684 /* path is assumed to be MAXPATHLEN */
685 static char *compute_checkpoint_filename(jlog_ctx *ctx, const char *subscriber, char *name)
686 {
687   const char *sub;
688   int len;
689
690   /* build checkpoint filename */
691   len = strlen(ctx->path);
692   memcpy(name, ctx->path, len);
693   name[len++] = IFS_CH;
694   name[len++] = 'c';
695   name[len++] = 'p';
696   name[len++] = '.';
697   for (sub = subscriber; *sub; ) {
698     name[len++] = __jlog_hexchars[((*sub & 0xf0) >> 4)];
699     name[len++] = __jlog_hexchars[(*sub & 0x0f)];
700     sub++;
701   }
702   name[len] = '\0';
703
704 #ifdef DEBUG
705   fprintf(stderr, "checkpoint %s filename is %s\n", subscriber, name);
706 #endif
707   return name;
708 }
709
710 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags)
711 {
712   char name[MAXPATHLEN];
713   compute_checkpoint_filename(ctx, cpname, name);
714   return jlog_file_open(name, flags, ctx->file_mode);
715 }
716
717 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log) {
718   char file[MAXPATHLEN];
719
720   if(ctx->current_log != log) {
721     __jlog_close_reader(ctx);
722     __jlog_close_indexer(ctx);
723   }
724   if(ctx->data) {
725     return ctx->data;
726   }
727   STRSETDATAFILE(ctx, file, log);
728 #ifdef DEBUG
729   fprintf(stderr, "opening log file[ro]: '%s'\n", file);
730 #endif
731   ctx->data = jlog_file_open(file, 0, ctx->file_mode);
732   ctx->current_log = log;
733   return ctx->data;
734 }
735
736 static int __jlog_munmap_reader(jlog_ctx *ctx) {
737   if(ctx->mmap_base) {
738     munmap(ctx->mmap_base, ctx->mmap_len);
739     ctx->mmap_base = NULL;
740     ctx->mmap_len = 0;
741   }
742   return 0;
743 }
744
745 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log) {
746   if(ctx->current_log == log && ctx->mmap_base) return 0;
747   __jlog_open_reader(ctx, log);
748   if(!ctx->data)
749     return -1;
750   if (!jlog_file_map_read(ctx->data, &ctx->mmap_base, &ctx->mmap_len)) {
751     ctx->mmap_base = NULL;
752     ctx->last_error = JLOG_ERR_FILE_READ;
753     ctx->last_errno = errno;
754     return -1;
755   }
756   return 0;
757 }
758
759 static jlog_file *__jlog_open_writer(jlog_ctx *ctx) {
760   char file[MAXPATHLEN];
761
762   if(ctx->data) {
763     /* Still open */
764     return ctx->data;
765   }
766
767   if(!jlog_file_lock(ctx->metastore))
768     SYS_FAIL(JLOG_ERR_LOCK);
769   if(__jlog_restore_metastore(ctx, 1))
770     SYS_FAIL(JLOG_ERR_META_OPEN);
771   ctx->current_log =  ctx->meta->storage_log;
772   STRSETDATAFILE(ctx, file, ctx->current_log);
773 #ifdef DEBUG
774   fprintf(stderr, "opening log file[rw]: '%s'\n", file);
775 #endif
776   ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode);
777  finish:
778   jlog_file_unlock(ctx->metastore);
779   return ctx->data;
780 }
781
782 static int __jlog_close_writer(jlog_ctx *ctx) {
783   if (ctx->data) {
784     jlog_file_close(ctx->data);
785     ctx->data = NULL;
786   }
787   return 0;
788 }
789
790 static int __jlog_close_reader(jlog_ctx *ctx) {
791   __jlog_munmap_reader(ctx);
792   if (ctx->data) {
793     jlog_file_close(ctx->data);
794     ctx->data = NULL;
795   }
796   return 0;
797 }
798
799 static int __jlog_close_checkpoint(jlog_ctx *ctx) {
800   if (ctx->checkpoint) {
801     jlog_file_close(ctx->checkpoint);
802     ctx->checkpoint = NULL;
803   }
804   return 0;
805 }
806
807 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log) {
808   char file[MAXPATHLEN];
809   int len;
810
811   if(ctx->current_log != log) {
812     __jlog_close_reader(ctx);
813     __jlog_close_indexer(ctx);
814   }
815   if(ctx->index) {
816     return ctx->index;
817   }
818   STRSETDATAFILE(ctx, file, log);
819
820   len = strlen(file);
821   if((len + sizeof(INDEX_EXT)) > sizeof(file)) return NULL;
822   memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
823 #ifdef DEBUG
824   fprintf(stderr, "opening index file: '%s'\n", file);
825 #endif
826   ctx->index = jlog_file_open(file, O_CREAT, ctx->file_mode);
827   ctx->current_log = log;
828   return ctx->index;
829 }
830
831 static int __jlog_close_indexer(jlog_ctx *ctx) {
832   if (ctx->index) {
833     jlog_file_close(ctx->index);
834     ctx->index = NULL;
835   }
836   return 0;
837 }
838
839 static int
840 ___jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last,
841                      int *closed) {
842   jlog_message_header logmhdr;
843   int i, second_try = 0;
844   off_t index_off, data_off, data_len;
845   u_int64_t index;
846   u_int64_t indices[BUFFERED_INDICES];
847
848   ctx->last_error = JLOG_ERR_SUCCESS;
849   if(closed) *closed = 0;
850
851   __jlog_open_reader(ctx, log);
852   if (!ctx->data) {
853     ctx->last_error = JLOG_ERR_FILE_OPEN;
854     ctx->last_errno = errno;
855     return -1;
856   }
857
858 #define RESTART do { \
859   if (second_try == 0) { \
860     jlog_file_truncate(ctx->index, index_off); \
861     jlog_file_unlock(ctx->index); \
862     second_try = 1; \
863     ctx->last_error = JLOG_ERR_SUCCESS; \
864     goto restart; \
865   } \
866   SYS_FAIL(JLOG_ERR_IDX_CORRUPT); \
867 } while (0)
868
869 restart:
870   __jlog_open_indexer(ctx, log);
871   if (!ctx->index) {
872     ctx->last_error = JLOG_ERR_IDX_OPEN;
873     ctx->last_errno = errno;
874     return -1;
875   }
876   if (!jlog_file_lock(ctx->index)) {
877     ctx->last_error = JLOG_ERR_LOCK;
878     ctx->last_errno = errno;
879     return -1;
880   }
881
882   data_off = 0;
883   if ((data_len = jlog_file_size(ctx->data)) == -1)
884     SYS_FAIL(JLOG_ERR_FILE_SEEK);
885   if ((index_off = jlog_file_size(ctx->index)) == -1)
886     SYS_FAIL(JLOG_ERR_IDX_SEEK);
887
888   if (index_off % sizeof(u_int64_t)) {
889 #ifdef DEBUG
890     fprintf(stderr, "corrupt index [%llu]\n", index_off);
891 #endif
892     RESTART;
893   }
894
895   if (index_off > sizeof(u_int64_t)) {
896     if (!jlog_file_pread(ctx->index, &index, sizeof(index),
897                          index_off - sizeof(u_int64_t)))
898     {
899       SYS_FAIL(JLOG_ERR_IDX_READ);
900     }
901     if (index == 0) {
902       /* This log file has been "closed" */
903 #ifdef DEBUG
904       fprintf(stderr, "index closed\n");
905 #endif
906       if(last) {
907         last->log = log;
908         last->marker = (index_off / sizeof(u_int64_t)) - 1;
909       }
910       if(closed) *closed = 1;
911       goto finish;
912     } else {
913       if (index > data_len) {
914 #ifdef DEBUG
915         fprintf(stderr, "index told me to seek somehwere I can't\n");
916 #endif
917         RESTART;
918       }
919       data_off = index;
920     }
921   }
922
923   if (index_off > 0) {
924     /* We are adding onto a partial index so we must advance a record */
925     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
926       SYS_FAIL(JLOG_ERR_FILE_READ);
927     if ((data_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
928       RESTART;
929   }
930
931   i = 0;
932   while (data_off + sizeof(logmhdr) <= data_len) {
933     off_t next_off = data_off;
934
935     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
936       SYS_FAIL(JLOG_ERR_FILE_READ);
937     if (logmhdr.reserved != ctx->meta->hdr_magic) {
938 #ifdef DEBUG
939       fprintf(stderr, "logmhdr.reserved == %d\n", logmhdr.reserved);
940 #endif
941       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
942     }
943     if ((next_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
944       break;
945
946     /* Write our new index offset */
947     indices[i++] = data_off;
948     if(i >= BUFFERED_INDICES) {
949 #ifdef DEBUG
950       fprintf(stderr, "writing %i offsets\n", i);
951 #endif
952       if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
953         RESTART;
954       index_off += i * sizeof(u_int64_t);
955       i = 0;
956     }
957     data_off = next_off;
958   }
959   if(i > 0) {
960 #ifdef DEBUG
961     fprintf(stderr, "writing %i offsets\n", i);
962 #endif
963     if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
964       RESTART;
965     index_off += i * sizeof(u_int64_t);
966   }
967   if(last) {
968     last->log = log;
969     last->marker = index_off / sizeof(u_int64_t);
970   }
971   if(log < ctx->meta->storage_log) {
972     if (data_off != data_len) {
973 #ifdef DEBUG
974       fprintf(stderr, "closing index, but %llu != %llu\n", data_off, data_len);
975 #endif
976       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
977     }
978     /* Special case: if we are closing, we next write a '0'
979      * we can't write the closing marker if the data segment had no records
980      * in it, since it will be confused with an index to offset 0 by the
981      * next reader; this only happens when segments are repaired */
982     if (index_off) {
983       index = 0;
984       if (!jlog_file_pwrite(ctx->index, &index, sizeof(u_int64_t), index_off))
985         RESTART;
986       index_off += sizeof(u_int64_t);
987     }
988     if(closed) *closed = 1;
989   }
990 #undef RESTART
991
992 finish:
993   jlog_file_unlock(ctx->index);
994 #ifdef DEBUG
995   fprintf(stderr, "index is %s\n", closed?(*closed?"closed":"open"):"unknown");
996 #endif
997   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
998   return -1;
999 }
1000
1001 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *closed) {
1002   int attempts, rv = -1;
1003   for(attempts=0; attempts<4; attempts++) {
1004     rv = ___jlog_resync_index(ctx, log, last, closed);
1005     if(ctx->last_error == JLOG_ERR_SUCCESS) break;
1006     if(ctx->last_error == JLOG_ERR_FILE_OPEN ||
1007        ctx->last_error == JLOG_ERR_IDX_OPEN) break;
1008
1009     /* We can't fix the file if someone may write to it again */
1010     if(log >= ctx->meta->storage_log) break;
1011
1012     jlog_file_lock(ctx->index);
1013     /* it doesn't really matter what jlog_repair_datafile returns
1014      * we'll keep retrying anyway */
1015     jlog_repair_datafile(ctx, log);
1016     jlog_file_truncate(ctx->index, 0);
1017     jlog_file_unlock(ctx->index);
1018   }
1019   return rv;
1020 }
1021
1022 jlog_ctx *jlog_new(const char *path) {
1023   jlog_ctx *ctx;
1024   ctx = calloc(1, sizeof(*ctx));
1025   ctx->meta = &ctx->pre_init;
1026   ctx->pre_init.unit_limit = DEFAULT_UNIT_LIMIT;
1027   ctx->pre_init.safety = DEFAULT_SAFETY;
1028   ctx->pre_init.hdr_magic = DEFAULT_HDR_MAGIC;
1029   ctx->file_mode = DEFAULT_FILE_MODE;
1030   ctx->context_mode = JLOG_NEW;
1031   ctx->path = strdup(path);
1032   return ctx;
1033 }
1034
1035 void jlog_set_error_func(jlog_ctx *ctx, jlog_error_func Func, void *ptr) {
1036   ctx->error_func = Func;
1037   ctx->error_ctx = ptr;
1038 }
1039
1040 size_t jlog_raw_size(jlog_ctx *ctx) {
1041   DIR *d;
1042   struct dirent *de;
1043   size_t totalsize = 0;
1044   int ferr, len;
1045   char filename[MAXPATHLEN];
1046
1047   d = opendir(ctx->path);
1048   if(!d) return 0;
1049   len = strlen(ctx->path);
1050   memcpy(filename, ctx->path, len);
1051   filename[len++] = IFS_CH;
1052   while((de = readdir(d)) != NULL) {
1053     struct stat sb;
1054     int dlen;
1055
1056     dlen = strlen(de->d_name);
1057     if((len + dlen + 1) > sizeof(filename)) continue;
1058     memcpy(filename+len, de->d_name, dlen + 1); /* include \0 */
1059     while((ferr = stat(filename, &sb)) == -1 && errno == EINTR);
1060     if(ferr == 0 && S_ISREG(sb.st_mode)) totalsize += sb.st_size;
1061   }
1062   closedir(d);
1063   return totalsize;
1064 }
1065
1066 const char *jlog_ctx_err_string(jlog_ctx *ctx) {
1067   switch (ctx->last_error) {
1068 #define MSG_O_MATIC(x)  case x: return #x;
1069     MSG_O_MATIC( JLOG_ERR_SUCCESS);
1070     MSG_O_MATIC( JLOG_ERR_ILLEGAL_INIT);
1071     MSG_O_MATIC( JLOG_ERR_ILLEGAL_OPEN);
1072     MSG_O_MATIC( JLOG_ERR_OPEN);
1073     MSG_O_MATIC( JLOG_ERR_NOTDIR);
1074     MSG_O_MATIC( JLOG_ERR_CREATE_PATHLEN);
1075     MSG_O_MATIC( JLOG_ERR_CREATE_EXISTS);
1076     MSG_O_MATIC( JLOG_ERR_CREATE_MKDIR);
1077     MSG_O_MATIC( JLOG_ERR_CREATE_META);
1078     MSG_O_MATIC( JLOG_ERR_LOCK);
1079     MSG_O_MATIC( JLOG_ERR_IDX_OPEN);
1080     MSG_O_MATIC( JLOG_ERR_IDX_SEEK);
1081     MSG_O_MATIC( JLOG_ERR_IDX_CORRUPT);
1082     MSG_O_MATIC( JLOG_ERR_IDX_WRITE);
1083     MSG_O_MATIC( JLOG_ERR_IDX_READ);
1084     MSG_O_MATIC( JLOG_ERR_FILE_OPEN);
1085     MSG_O_MATIC( JLOG_ERR_FILE_SEEK);
1086     MSG_O_MATIC( JLOG_ERR_FILE_CORRUPT);
1087     MSG_O_MATIC( JLOG_ERR_FILE_READ);
1088     MSG_O_MATIC( JLOG_ERR_FILE_WRITE);
1089     MSG_O_MATIC( JLOG_ERR_META_OPEN);
1090     MSG_O_MATIC( JLOG_ERR_ILLEGAL_WRITE);
1091     MSG_O_MATIC( JLOG_ERR_ILLEGAL_CHECKPOINT);
1092     MSG_O_MATIC( JLOG_ERR_INVALID_SUBSCRIBER);
1093     MSG_O_MATIC( JLOG_ERR_ILLEGAL_LOGID);
1094     MSG_O_MATIC( JLOG_ERR_SUBSCRIBER_EXISTS);
1095     MSG_O_MATIC( JLOG_ERR_CHECKPOINT);
1096     MSG_O_MATIC( JLOG_ERR_NOT_SUPPORTED);
1097     MSG_O_MATIC( JLOG_ERR_CLOSE_LOGID);
1098     default: return "Unknown";
1099   }
1100 }
1101
1102 int jlog_ctx_err(jlog_ctx *ctx) {
1103   return ctx->last_error;
1104 }
1105
1106 int jlog_ctx_errno(jlog_ctx *ctx) {
1107   return ctx->last_errno;
1108 }
1109
1110 int jlog_ctx_alter_safety(jlog_ctx *ctx, jlog_safety safety) {
1111   if(ctx->meta->safety == safety) return 0;
1112   if(ctx->context_mode == JLOG_APPEND ||
1113      ctx->context_mode == JLOG_NEW) {
1114     ctx->meta->safety = safety;
1115     if(ctx->context_mode == JLOG_APPEND) {
1116       if(__jlog_save_metastore(ctx, 0) != 0) {
1117         SYS_FAIL(JLOG_ERR_CREATE_META);
1118       }
1119     }
1120     return 0;
1121   }
1122  finish:
1123   return -1;
1124 }
1125 int jlog_ctx_alter_journal_size(jlog_ctx *ctx, size_t size) {
1126   if(ctx->meta->unit_limit == size) return 0;
1127   if(ctx->context_mode == JLOG_APPEND ||
1128      ctx->context_mode == JLOG_NEW) {
1129     ctx->meta->unit_limit = size;
1130     if(ctx->context_mode == JLOG_APPEND) {
1131       if(__jlog_save_metastore(ctx, 0) != 0) {
1132         SYS_FAIL(JLOG_ERR_CREATE_META);
1133       }
1134     }
1135     return 0;
1136   }
1137  finish:
1138   return -1;
1139 }
1140 int jlog_ctx_alter_mode(jlog_ctx *ctx, int mode) {
1141   ctx->file_mode = mode;
1142   return 0;
1143 }
1144 int jlog_ctx_open_writer(jlog_ctx *ctx) {
1145   int rv;
1146   struct stat sb;
1147
1148   ctx->last_error = JLOG_ERR_SUCCESS;
1149   if(ctx->context_mode != JLOG_NEW) {
1150     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1151     return -1;
1152   }
1153   ctx->context_mode = JLOG_APPEND;
1154   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1155   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1156   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1157   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1158   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1159  finish:
1160   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1161   ctx->context_mode = JLOG_INVALID;
1162   return -1;
1163 }
1164 int jlog_ctx_open_reader(jlog_ctx *ctx, const char *subscriber) {
1165   int rv;
1166   struct stat sb;
1167   jlog_id dummy;
1168
1169   ctx->last_error = JLOG_ERR_SUCCESS;
1170   if(ctx->context_mode != JLOG_NEW) {
1171     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1172     return -1;
1173   }
1174   ctx->context_mode = JLOG_READ;
1175   ctx->subscriber_name = strdup(subscriber);
1176   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1177   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1178   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1179   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1180   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &dummy))
1181     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1182   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1183  finish:
1184   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1185   ctx->context_mode = JLOG_INVALID;
1186   return -1;
1187 }
1188 int jlog_ctx_init(jlog_ctx *ctx) {
1189   int rv;
1190   struct stat sb;
1191   int dirmode;
1192
1193   ctx->last_error = JLOG_ERR_SUCCESS;
1194   if(strlen(ctx->path) > MAXLOGPATHLEN-1) {
1195     ctx->last_error = JLOG_ERR_CREATE_PATHLEN;
1196     return -1;
1197   }
1198   if(ctx->context_mode != JLOG_NEW) {
1199     ctx->last_error = JLOG_ERR_ILLEGAL_INIT;
1200     return -1;
1201   }
1202   ctx->context_mode = JLOG_INIT;
1203   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1204   if(rv == 0 || errno != ENOENT) {
1205     SYS_FAIL_EX(JLOG_ERR_CREATE_EXISTS, 0);
1206   }
1207   dirmode = ctx->file_mode;
1208   if(dirmode & 0400) dirmode |= 0100;
1209   if(dirmode & 040) dirmode |= 010;
1210   if(dirmode & 04) dirmode |= 01;
1211   if(mkdir(ctx->path, dirmode) == -1)
1212     SYS_FAIL(JLOG_ERR_CREATE_MKDIR);
1213   chmod(ctx->path, dirmode);
1214   /* Setup our initial state and store our instance metadata */
1215   if(__jlog_open_metastore(ctx) != 0)
1216     SYS_FAIL(JLOG_ERR_CREATE_META);
1217   if(__jlog_save_metastore(ctx, 0) != 0)
1218     SYS_FAIL(JLOG_ERR_CREATE_META);
1219  finish:
1220   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1221   return -1;
1222 }
1223 int jlog_ctx_close(jlog_ctx *ctx) {
1224   __jlog_close_writer(ctx);
1225   __jlog_close_indexer(ctx);
1226   __jlog_close_reader(ctx);
1227   __jlog_close_metastore(ctx);
1228   __jlog_close_checkpoint(ctx);
1229   if(ctx->subscriber_name) free(ctx->subscriber_name);
1230   if(ctx->path) free(ctx->path);
1231   free(ctx);
1232   return 0;
1233 }
1234
1235 static int __jlog_metastore_atomic_increment(jlog_ctx *ctx) {
1236   char file[MAXPATHLEN];
1237 #ifdef DEBUG
1238   fprintf(stderr, "atomic increment on %u\n", ctx->current_log);
1239 #endif
1240   if(ctx->data) SYS_FAIL(JLOG_ERR_NOT_SUPPORTED);
1241   if (!jlog_file_lock(ctx->metastore))
1242     SYS_FAIL(JLOG_ERR_LOCK);
1243   if(__jlog_restore_metastore(ctx, 1))
1244     SYS_FAIL(JLOG_ERR_META_OPEN);
1245   if(ctx->meta->storage_log == ctx->current_log) {
1246     /* We're the first ones to it, so we get to increment it */
1247     ctx->current_log++;
1248     STRSETDATAFILE(ctx, file, ctx->current_log);
1249     ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode);
1250     ctx->meta->storage_log = ctx->current_log;
1251     if(__jlog_save_metastore(ctx, 1))
1252       SYS_FAIL(JLOG_ERR_META_OPEN);
1253   }
1254  finish:
1255   jlog_file_unlock(ctx->metastore);
1256   /* Now we update our curent_log to the current storage_log,
1257    * it may have advanced farther than we know.
1258    */
1259   ctx->current_log = ctx->meta->storage_log;
1260   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1261   return -1;
1262 }
1263 int jlog_ctx_write_message(jlog_ctx *ctx, jlog_message *mess, struct timeval *when) {
1264   struct timeval now;
1265   jlog_message_header hdr;
1266   off_t current_offset;
1267
1268   ctx->last_error = JLOG_ERR_SUCCESS;
1269   if(ctx->context_mode != JLOG_APPEND) {
1270     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1271     ctx->last_errno = EPERM;
1272     return -1;
1273   }
1274  begin:
1275   __jlog_open_writer(ctx);
1276   if(!ctx->data) {
1277     ctx->last_error = JLOG_ERR_FILE_OPEN;
1278     ctx->last_errno = errno;
1279     return -1;
1280   }
1281   if (!jlog_file_lock(ctx->data)) {
1282     ctx->last_error = JLOG_ERR_LOCK;
1283     ctx->last_errno = errno;
1284     return -1;
1285   }
1286
1287   if ((current_offset = jlog_file_size(ctx->data)) == -1)
1288     SYS_FAIL(JLOG_ERR_FILE_SEEK);
1289   if(ctx->meta->unit_limit <= current_offset) {
1290     jlog_file_unlock(ctx->data);
1291     __jlog_close_writer(ctx);
1292     __jlog_metastore_atomic_increment(ctx);
1293     goto begin;
1294   }
1295
1296   hdr.reserved = ctx->meta->hdr_magic;
1297   if (when) {
1298     hdr.tv_sec = when->tv_sec;
1299     hdr.tv_usec = when->tv_usec;
1300   } else {
1301     gettimeofday(&now, NULL);
1302     hdr.tv_sec = now.tv_sec;
1303     hdr.tv_usec = now.tv_usec;
1304   }
1305   hdr.mlen = mess->mess_len;
1306   if (!jlog_file_pwrite(ctx->data, &hdr, sizeof(hdr), current_offset))
1307     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1308   current_offset += sizeof(hdr);
1309   if (!jlog_file_pwrite(ctx->data, mess->mess, mess->mess_len, current_offset))
1310     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1311   current_offset += mess->mess_len;
1312
1313   if(ctx->meta->unit_limit <= current_offset) {
1314     jlog_file_unlock(ctx->data);
1315     __jlog_close_writer(ctx);
1316     __jlog_metastore_atomic_increment(ctx);
1317     return 0;
1318   }
1319  finish:
1320   jlog_file_unlock(ctx->data);
1321   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1322   return -1;
1323 }
1324 int jlog_ctx_read_checkpoint(jlog_ctx *ctx, const jlog_id *chkpt) {
1325   ctx->last_error = JLOG_ERR_SUCCESS;
1326  
1327   if(ctx->context_mode != JLOG_READ) {
1328     ctx->last_error = JLOG_ERR_ILLEGAL_CHECKPOINT;
1329     ctx->last_errno = EPERM;
1330     return -1;
1331   }
1332   if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, chkpt) != 0) {
1333     ctx->last_error = JLOG_ERR_CHECKPOINT;
1334     ctx->last_errno = 0;
1335     return -1;
1336   }
1337   return 0;
1338 }
1339
1340 int jlog_ctx_remove_subscriber(jlog_ctx *ctx, const char *s) {
1341   char name[MAXPATHLEN];
1342   int rv;
1343
1344   compute_checkpoint_filename(ctx, s, name);
1345   rv = unlink(name);
1346
1347   if (rv == 0) {
1348     ctx->last_error = JLOG_ERR_SUCCESS;
1349     return 1;
1350   }
1351   if (errno == ENOENT) {
1352     ctx->last_error = JLOG_ERR_INVALID_SUBSCRIBER;
1353     return 0;
1354   }
1355   return -1;
1356 }
1357
1358 int jlog_ctx_add_subscriber(jlog_ctx *ctx, const char *s, jlog_position whence) {
1359   jlog_id chkpt;
1360   jlog_ctx *tmpctx = NULL;
1361   jlog_file *jchkpt;
1362   ctx->last_error = JLOG_ERR_SUCCESS;
1363
1364   jchkpt = __jlog_open_named_checkpoint(ctx, s, O_CREAT|O_EXCL);
1365   if(!jchkpt) {
1366     ctx->last_errno = errno;
1367     if(errno == EEXIST)
1368       ctx->last_error = JLOG_ERR_SUBSCRIBER_EXISTS;
1369     else
1370       ctx->last_error = JLOG_ERR_OPEN;
1371     return -1;
1372   }
1373   jlog_file_close(jchkpt);
1374  
1375   if(whence == JLOG_BEGIN) {
1376     memset(&chkpt, 0, sizeof(chkpt));
1377     jlog_ctx_first_log_id(ctx, &chkpt);
1378     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0) {
1379       ctx->last_error = JLOG_ERR_CHECKPOINT;
1380       ctx->last_errno = 0;
1381       return -1;
1382     }
1383     return 0;
1384   }
1385   if(whence == JLOG_END) {
1386     jlog_id start, finish;
1387     memset(&chkpt, 0, sizeof(chkpt));
1388     if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1389     if(__jlog_restore_metastore(ctx, 0))
1390       SYS_FAIL(JLOG_ERR_META_OPEN);
1391     chkpt.log = ctx->meta->storage_log;
1392     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0)
1393       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1394     tmpctx = jlog_new(ctx->path);
1395     if(jlog_ctx_open_reader(tmpctx, s) < 0) goto finish;
1396     if(jlog_ctx_read_interval(tmpctx, &start, &finish) < 0) goto finish;
1397     jlog_ctx_close(tmpctx);
1398     tmpctx = NULL;
1399     if(__jlog_set_checkpoint(ctx, s, &finish) != 0)
1400       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1401     return 0;
1402   }
1403   ctx->last_error = JLOG_ERR_NOT_SUPPORTED;
1404  finish:
1405   if(tmpctx) jlog_ctx_close(tmpctx);
1406   return -1;
1407 }
1408
1409 int jlog_ctx_add_subscriber_copy_checkpoint(jlog_ctx *old_ctx, const char *new,
1410                                 const char *old) {
1411   jlog_id chkpt;
1412   jlog_ctx *new_ctx = NULL;;
1413
1414   /* If there's no old checkpoint available, just return */
1415   if (jlog_get_checkpoint(old_ctx, old, &chkpt)) {
1416     return -1;
1417   }
1418
1419   /* If we can't open the jlog_ctx, just return */
1420   new_ctx = jlog_new(old_ctx->path);
1421   if (!new_ctx) {
1422     return -1;
1423   }
1424   if (jlog_ctx_add_subscriber(new_ctx, new, JLOG_BEGIN)) {
1425     /* If it already exists, we want to overwrite it */
1426     if (errno != EEXIST) {
1427       jlog_ctx_close(new_ctx);
1428       return -1;
1429     }
1430   }
1431
1432   /* Open a reader for the new subscriber */
1433   if(jlog_ctx_open_reader(new_ctx, new) < 0) {
1434     jlog_ctx_close(new_ctx);
1435     return -1;
1436   }
1437
1438   /* Set the checkpoint of the new subscriber to
1439    * the old subscriber's checkpoint */
1440   if (jlog_ctx_read_checkpoint(new_ctx, &chkpt)) {
1441     return -1;
1442   }
1443
1444   jlog_ctx_close(new_ctx);
1445   return 0;
1446 }
1447
1448 int jlog_ctx_write(jlog_ctx *ctx, const void *data, size_t len) {
1449   jlog_message m;
1450   m.mess = (void *)data;
1451   m.mess_len = len;
1452   return jlog_ctx_write_message(ctx, &m, NULL);
1453 }
1454
1455 static int __jlog_find_first_log_after(jlog_ctx *ctx, jlog_id *chkpt,
1456                                 jlog_id *start, jlog_id *finish) {
1457   jlog_id last;
1458   int closed;
1459
1460   memcpy(start, chkpt, sizeof(*chkpt));
1461  attempt:
1462   if(__jlog_resync_index(ctx, start->log, &last, &closed) != 0) {
1463     if(ctx->last_error == JLOG_ERR_FILE_OPEN &&
1464         ctx->last_errno == ENOENT) {
1465       char file[MAXPATHLEN];
1466       int ferr, len;
1467       struct stat sb = {0};
1468
1469       STRSETDATAFILE(ctx, file, start->log + 1);
1470       while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1471       /* That file doesn't exist... bad, but we can fake a recovery by
1472          advancing the next file that does exist */
1473       ctx->last_error = JLOG_ERR_SUCCESS;
1474       if(start->log >= ctx->meta->storage_log || (ferr != 0 && errno != ENOENT)) {
1475         /* We don't advance past where people are writing */
1476         memcpy(finish, start, sizeof(*start));
1477         return 0;
1478       }
1479       if(__jlog_resync_index(ctx, start->log + 1, &last, &closed) != 0) {
1480         /* We don't advance past where people are writing */
1481         memcpy(finish, start, sizeof(*start));
1482         return 0;
1483       }
1484       len = strlen(file);
1485       if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
1486       memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
1487       while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1488       if(ferr != 0 || sb.st_size == 0) {
1489         /* We don't advance past where people are writing */
1490         memcpy(finish, start, sizeof(*start));
1491         return 0;
1492       }
1493       start->marker = 0;
1494       start->log++;  /* BE SMARTER! */
1495       goto attempt;
1496     }
1497     return -1; /* Just persist resync's error state */
1498   }
1499
1500   /* If someone checkpoints off the end, be nice */
1501   if(last.log == start->log && last.marker < start->marker)
1502     memcpy(start, &last, sizeof(*start));
1503
1504   if(!memcmp(start, &last, sizeof(last)) && closed) {
1505     char file[MAXPATHLEN];
1506     int ferr, len;
1507     struct stat sb = {0};
1508
1509     STRSETDATAFILE(ctx, file, start->log + 1);
1510     while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1511     if(ferr) {
1512       fprintf(stderr, "stat(%s) error: %s\n", file, strerror(errno));
1513       if(start->log < ctx->meta->storage_log - 1) {
1514         start->marker = 0;
1515         start->log += 2;
1516         memcpy(finish, start, sizeof(*start));
1517         return 0;
1518       }
1519     }
1520     if(start->log >= ctx->meta->storage_log || ferr != 0 || sb.st_size == 0) {
1521       /* We don't advance past where people are writing */
1522       memcpy(finish, start, sizeof(*start));
1523       return 0;
1524     }
1525     if(__jlog_resync_index(ctx, start->log + 1, &last, &closed) != 0) {
1526       /* We don't advance past where people are writing */
1527       memcpy(finish, start, sizeof(*start));
1528       return 0;
1529     }
1530     len = strlen(file);
1531     if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
1532     memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
1533     while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1534     if(ferr != 0 || sb.st_size == 0) {
1535       /* We don't advance past where people are writing */
1536       memcpy(finish, start, sizeof(*start));
1537       return 0;
1538     }
1539     start->marker = 0;
1540     start->log++;
1541     goto attempt;
1542   }
1543   memcpy(finish, &last, sizeof(last));
1544   return 0;
1545 }
1546 int jlog_ctx_read_message(jlog_ctx *ctx, const jlog_id *id, jlog_message *m) {
1547   off_t index_len;
1548   u_int64_t data_off;
1549   int with_lock = 0;
1550
1551  once_more_with_lock:
1552
1553   ctx->last_error = JLOG_ERR_SUCCESS;
1554   if (ctx->context_mode != JLOG_READ)
1555     SYS_FAIL(JLOG_ERR_ILLEGAL_WRITE);
1556   if (id->marker < 1) {
1557     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1558   }
1559
1560   __jlog_open_reader(ctx, id->log);
1561   if(!ctx->data)
1562     SYS_FAIL(JLOG_ERR_FILE_OPEN);
1563   __jlog_open_indexer(ctx, id->log);
1564   if(!ctx->index)
1565     SYS_FAIL(JLOG_ERR_IDX_OPEN);
1566
1567   if(with_lock) {
1568     if (!jlog_file_lock(ctx->index)) {
1569       with_lock = 0;
1570       SYS_FAIL(JLOG_ERR_LOCK);
1571     }
1572   }
1573
1574   if ((index_len = jlog_file_size(ctx->index)) == -1)
1575     SYS_FAIL(JLOG_ERR_IDX_SEEK);
1576   if (index_len % sizeof(u_int64_t))
1577     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1578   if (id->marker * sizeof(u_int64_t) > index_len) {
1579     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1580   }
1581
1582   if (!jlog_file_pread(ctx->index, &data_off, sizeof(u_int64_t),
1583                        (id->marker - 1) * sizeof(u_int64_t)))
1584   {
1585     SYS_FAIL(JLOG_ERR_IDX_READ);
1586   }
1587   if (data_off == 0 && id->marker != 1) {
1588     if (id->marker * sizeof(u_int64_t) == index_len) {
1589       /* close tag; not a real offset */
1590       ctx->last_error = JLOG_ERR_CLOSE_LOGID;
1591       ctx->last_errno = 0;
1592       if(with_lock) jlog_file_unlock(ctx->index);
1593       return -1;
1594     } else {
1595       /* an offset of 0 in the middle of an index means curruption */
1596       SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1597     }
1598   }
1599
1600   if(__jlog_mmap_reader(ctx, id->log) != 0)
1601     SYS_FAIL(JLOG_ERR_FILE_READ);
1602
1603   if(data_off > ctx->mmap_len - sizeof(jlog_message_header)) {
1604 #ifdef DEBUG
1605     fprintf(stderr, "read idx off end: %llu\n", data_off);
1606 #endif
1607     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1608   }
1609
1610   memcpy(&m->aligned_header, ((u_int8_t *)ctx->mmap_base) + data_off,
1611          sizeof(jlog_message_header));
1612
1613   if(data_off + sizeof(jlog_message_header) + m->aligned_header.mlen > ctx->mmap_len) {
1614 #ifdef DEBUG
1615     fprintf(stderr, "read idx off end: %llu %llu\n", data_off, ctx->mmap_len);
1616 #endif
1617     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1618   }
1619
1620   m->header = &m->aligned_header;
1621   m->mess_len = m->header->mlen;
1622   m->mess = (((u_int8_t *)ctx->mmap_base) + data_off + sizeof(jlog_message_header));
1623
1624  finish:
1625   if(with_lock) jlog_file_unlock(ctx->index);
1626   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1627   if(!with_lock) {
1628     if (ctx->last_error == JLOG_ERR_IDX_CORRUPT) {
1629       if (jlog_file_lock(ctx->index)) {
1630         jlog_file_truncate(ctx->index, 0);
1631         jlog_file_unlock(ctx->index);
1632       }
1633     }
1634     ___jlog_resync_index(ctx, id->log, NULL, NULL);
1635     with_lock = 1;
1636 #ifdef DEBUG
1637     fprintf(stderr, "read retrying with lock\n");
1638 #endif
1639     goto once_more_with_lock;
1640   }
1641   return -1;
1642 }
1643 int jlog_ctx_read_interval(jlog_ctx *ctx, jlog_id *start, jlog_id *finish) {
1644   jlog_id chkpt;
1645   int count = 0;
1646
1647   ctx->last_error = JLOG_ERR_SUCCESS;
1648   if(ctx->context_mode != JLOG_READ) {
1649     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1650     ctx->last_errno = EPERM;
1651     return -1;
1652   }
1653
1654   __jlog_restore_metastore(ctx, 0);
1655   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &chkpt))
1656     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1657   if(__jlog_find_first_log_after(ctx, &chkpt, start, finish) != 0)
1658     goto finish; /* Leave whatever error was set in find_first_log_after */
1659   if(start->log != chkpt.log) start->marker = 0;
1660   else start->marker = chkpt.marker;
1661   if(start->log != chkpt.log) {
1662     /* We've advanced our checkpoint, let's not do this work again */
1663     if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, start) != 0)
1664       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1665   }
1666   /* Here 'start' is actually the checkpoint, so we must advance it one.
1667      However, that may not be possible, if there are no messages, so first
1668      make sure finish is bigger */
1669   count = finish->marker - start->marker;
1670   if(finish->marker > start->marker) start->marker++;
1671
1672   /* If the count is less than zero, the checkpoint is off the end
1673    * of the file. When this happens, we need to set it to the end of
1674    * the file */
1675   if (count < 0) {
1676     fprintf(stderr, "need to repair checkpoint for %s - start (%08x:%08x) > finish (%08x:%08x)\n", ctx->path,
1677         start->log, start->marker, finish->log, finish->marker);
1678     if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, finish) != 0) {
1679       fprintf(stderr, "failed repairing checkpoint for %s\n", ctx->path);
1680       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1681     }
1682     if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &chkpt)) {
1683       /* Should never happen */
1684       SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1685     }
1686     fprintf(stderr, "repaired checkpoint for %s: %08x:%08x\n", ctx->path, chkpt.log, chkpt.marker);
1687     ctx->last_error = JLOG_ERR_SUCCESS;
1688     count = 0;
1689   }
1690
1691   /* We need to munmap it, so that we can remap it with more data if needed */
1692   __jlog_munmap_reader(ctx);
1693  finish:
1694   if(ctx->last_error == JLOG_ERR_SUCCESS) return count;
1695   return -1;
1696 }
1697
1698 int jlog_ctx_first_log_id(jlog_ctx *ctx, jlog_id *id) {
1699   DIR *d;
1700   struct dirent *de;
1701   ctx->last_error = JLOG_ERR_SUCCESS;
1702   u_int32_t log;
1703   int found = 0;
1704
1705   id->log = 0xffffffff;
1706   id->marker = 0;
1707   d = opendir(ctx->path);
1708   if (!d) return -1;
1709
1710   while ((de = readdir(d))) {
1711     int i;
1712     char *cp = de->d_name;
1713     if(strlen(cp) != 8) continue;
1714     log = 0;
1715     for(i=0;i<8;i++) {
1716       log <<= 4;
1717       if(cp[i] >= '0' && cp[i] <= '9') log |= (cp[i] - '0');
1718       else if(cp[i] >= 'a' && cp[i] <= 'f') log |= (cp[i] - 'a' + 0xa);
1719       else if(cp[i] >= 'A' && cp[i] <= 'F') log |= (cp[i] - 'A' + 0xa);
1720       else break;
1721     }
1722     if(i != 8) continue;
1723     found = 1;
1724     if(log < id->log) id->log = log;
1725   }
1726   if(!found) id->log = 0;
1727   closedir(d);
1728   return 0;
1729 }
1730
1731 int jlog_ctx_last_log_id(jlog_ctx *ctx, jlog_id *id) {
1732   ctx->last_error = JLOG_ERR_SUCCESS;
1733   if(ctx->context_mode != JLOG_READ) {
1734     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1735     ctx->last_errno = EPERM;
1736     return -1;
1737   }
1738   if (__jlog_restore_metastore(ctx, 0) != 0) return -1;
1739   ___jlog_resync_index(ctx, ctx->meta->storage_log, id, NULL);
1740   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1741   return -1;
1742 }
1743
1744 int jlog_ctx_advance_id(jlog_ctx *ctx, jlog_id *cur,
1745                         jlog_id *start, jlog_id *finish)
1746 {
1747   int rv;
1748   if(memcmp(cur, finish, sizeof(jlog_id))) {
1749     start->marker++;
1750   } else {
1751     if((rv = __jlog_find_first_log_after(ctx, cur, start, finish)) != 0) {
1752       return rv;
1753     }
1754     if(cur->log != start->log) {
1755       start->marker = 1;
1756     }
1757     else start->marker = cur->marker;
1758   }
1759   return 0;
1760 }
1761
1762 static int is_datafile(const char *f, u_int32_t *logid) {
1763   int i;
1764   u_int32_t l = 0;
1765   for(i=0; i<8; i++) {
1766     if((f[i] >= '0' && f[i] <= '9') ||
1767        (f[i] >= 'a' && f[i] <= 'f')) {
1768       l <<= 4;
1769       l |= (f[i] < 'a') ? (f[i] - '0') : (f[i] - 'a' + 10);
1770     }
1771     else
1772       return 0;
1773   }
1774   if(f[i] != '\0') return 0;
1775   if(logid) *logid = l;
1776   return 1;
1777 }
1778
1779 int jlog_clean(const char *file) {
1780   int rv = -1;
1781   u_int32_t earliest = 0;
1782   jlog_ctx *log;
1783   DIR *dir;
1784   struct dirent *de;
1785
1786   log = jlog_new(file);
1787   jlog_ctx_open_writer(log);
1788   dir = opendir(file);
1789   if(!dir) goto out;
1790
1791   earliest = 0;
1792   if(jlog_pending_readers(log, 0, &earliest) < 0) goto out;
1793
1794   rv = 0;
1795   while((de = readdir(dir)) != NULL) {
1796     u_int32_t logid;
1797     if(is_datafile(de->d_name, &logid) && logid < earliest) {
1798       char fullfile[MAXPATHLEN];
1799       char fullidx[MAXPATHLEN];
1800       snprintf(fullfile, sizeof(fullfile), "%s/%s", file, de->d_name);
1801       snprintf(fullidx, sizeof(fullidx), "%s/%s" INDEX_EXT, file, de->d_name);
1802       (void)unlink(fullfile);
1803       (void)unlink(fullidx); /* this may not exist; don't care */
1804       rv++;
1805     }
1806   }
1807   closedir(dir);
1808  out:
1809   jlog_ctx_close(log);
1810   return rv;
1811 }
1812
1813 /* vim:se ts=2 sw=2 et: */
Note: See TracBrowser for help on using the browser.