root/jlog.c

Revision 34720bdee5557970113da980f5e1a67d97430d88, 79.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 months ago)

flush precommit buffer on close

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2008, Message Systems, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name Message Systems, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 /*****************************************************************
34
35   Journaled logging... append only.
36
37       (1) find current file, or allocate a file, extendible and mark
38           it current.
39  
40       (2) Write records to it, records include their size, so
41           a simple inspection can detect and incomplete trailing
42           record.
43    
44       (3) Write append until the file reaches a certain size.
45
46       (4) Allocate a file, extensible.
47
48       (5) RESYNC INDEX on 'finished' file (see reading:3) and postpend
49           an offset '0' to the index.
50    
51       (2) goto (1)
52    
53   Reading journals...
54
55       (1) find oldest checkpoint of all subscribers, remove all older files.
56
57       (2) (file, last_read) = find_checkpoint for this subscriber
58
59       (3) RESYNC INDEX:
60           open record index for file, seek to the end -  off_t.
61           this is the offset of the last noticed record in this file.
62           open file, seek to this point, roll forward writing the index file
63           _do not_ write an offset for the last record unless it is found
64           complete.
65
66       (4) read entries from last_read+1 -> index of record index
67
68 */
69
70 #include <stdio.h>
71
72 #include "jlog_config.h"
73 #include "jlog_private.h"
74 #include "jlog_compress.h"
75 #if HAVE_UNISTD_H
76 #include <unistd.h>
77 #endif
78 #if HAVE_SYS_UIO_H
79 #include <sys/uio.h>
80 #endif
81 #if HAVE_SYS_TIME_H
82 #include <sys/time.h>
83 #endif
84 #if HAVE_DIRENT_H
85 #include <dirent.h>
86 #endif
87 #if HAVE_FCNTL_H
88 #include <fcntl.h>
89 #endif
90 #if HAVE_ERRNO_H
91 #include <errno.h>
92 #endif
93 #if HAVE_TIME_H
94 #include <time.h>
95 #endif
96 #if HAVE_SYS_MMAN_H
97 #include <sys/mman.h>
98 #endif
99
100 #include "fassert.h"
101 #include <pthread.h>
102
103 #define BUFFERED_INDICES 1024
104 #define PRE_COMMIT_BUFFER_SIZE_DEFAULT 0
105 #define IS_COMPRESS_MAGIC(ctx) (((ctx)->meta->hdr_magic & DEFAULT_HDR_MAGIC_COMPRESSION) == DEFAULT_HDR_MAGIC_COMPRESSION)
106
107
108 static jlog_file *__jlog_open_writer(jlog_ctx *ctx);
109 static int __jlog_close_writer(jlog_ctx *ctx);
110 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log);
111 static int __jlog_close_reader(jlog_ctx *ctx);
112 static int __jlog_close_checkpoint(jlog_ctx *ctx);
113 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log);
114 static int __jlog_close_indexer(jlog_ctx *ctx);
115 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *c);
116 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags);
117 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log);
118 static int __jlog_munmap_reader(jlog_ctx *ctx);
119 static int __jlog_metastore_atomic_increment(jlog_ctx *ctx);
120
121 int jlog_snprint_logid(char *b, int n, const jlog_id *id) {
122   return snprintf(b, n, "%08x:%08x", id->log, id->marker);
123 }
124
125 int jlog_repair_datafile(jlog_ctx *ctx, u_int32_t log)
126 {
127   jlog_message_header_compressed hdr;
128   size_t hdr_size = sizeof(jlog_message_header);
129   uint32_t *message_disk_len = &hdr.mlen;
130   if (IS_COMPRESS_MAGIC(ctx)) {
131     hdr_size = sizeof(jlog_message_header_compressed);
132     message_disk_len = &hdr.compressed_len;
133   }
134   char *this, *next, *afternext = NULL, *mmap_end;
135   int i, invalid_count = 0;
136   struct {
137     off_t start, end;
138   } *invalid = NULL;
139   off_t orig_len, src, dst, len;
140
141 #define TAG_INVALID(s, e) do { \
142   if (invalid_count) \
143     invalid = realloc(invalid, (invalid_count + 1) * sizeof(*invalid)); \
144   else \
145     invalid = malloc(sizeof(*invalid)); \
146   invalid[invalid_count].start = s - (char *)ctx->mmap_base; \
147   invalid[invalid_count].end = e - (char *)ctx->mmap_base; \
148   invalid_count++; \
149 } while (0)
150
151   ctx->last_error = JLOG_ERR_SUCCESS;
152
153   /* we want the reader's open logic because this runs in the read path
154    * the underlying fds are always RDWR anyway */
155   __jlog_open_reader(ctx, log);
156   if (!ctx->data) {
157     ctx->last_error = JLOG_ERR_FILE_OPEN;
158     ctx->last_errno = errno;
159     return -1;
160   }
161   if (!jlog_file_lock(ctx->data)) {
162     ctx->last_error = JLOG_ERR_LOCK;
163     ctx->last_errno = errno;
164     return -1;
165   }
166   if (__jlog_mmap_reader(ctx, log) != 0)
167     SYS_FAIL(JLOG_ERR_FILE_READ);
168
169   orig_len = ctx->mmap_len;
170   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
171   /* these values will cause us to fall right into the error clause and
172    * start searching for a valid header from offset 0 */
173   this = (char*)ctx->mmap_base - hdr_size;
174   hdr.reserved = ctx->meta->hdr_magic;
175   hdr.mlen = 0;
176
177   while (this + hdr_size <= mmap_end) {
178     next = this + hdr_size + *message_disk_len;
179     if (next <= (char *)ctx->mmap_base) goto error;
180     if (next == mmap_end) {
181       this = next;
182       break;
183     }
184     if (next + hdr_size > mmap_end) goto error;
185     memcpy(&hdr, next, hdr_size);
186     if (hdr.reserved != ctx->meta->hdr_magic) goto error;
187     this = next;
188     continue;
189   error:
190     for (next = this + hdr_size; next + hdr_size <= mmap_end; next++) {
191       memcpy(&hdr, next, hdr_size);
192       if (hdr.reserved == ctx->meta->hdr_magic) {
193         afternext = next + hdr_size + *message_disk_len;
194         if (afternext <= (char *)ctx->mmap_base) continue;
195         if (afternext == mmap_end) break;
196         if (afternext + hdr_size > mmap_end) continue;
197         memcpy(&hdr, afternext, hdr_size);
198         if (hdr.reserved == ctx->meta->hdr_magic) break;
199       }
200     }
201     /* correct for while loop entry condition */
202     if (this < (char *)ctx->mmap_base) this = ctx->mmap_base;
203     if (next + hdr_size > mmap_end) break;
204     if (next > this) TAG_INVALID(this, next);
205     this = afternext;
206   }
207   if (this != mmap_end) TAG_INVALID(this, mmap_end);
208
209 #undef TAG_INVALID
210
211 #define MOVE_SEGMENT do { \
212   char cpbuff[4096]; \
213   off_t chunk; \
214   while(len > 0) { \
215     chunk = len; \
216     if (chunk > sizeof(cpbuff)) chunk = sizeof(cpbuff); \
217     if (!jlog_file_pread(ctx->data, &cpbuff, chunk, src)) \
218       SYS_FAIL(JLOG_ERR_FILE_READ); \
219     if (!jlog_file_pwrite(ctx->data, &cpbuff, chunk, dst)) \
220       SYS_FAIL(JLOG_ERR_FILE_WRITE); \
221     src += chunk; \
222     dst += chunk; \
223     len -= chunk; \
224   } \
225 } while (0)
226
227   if (invalid_count > 0) {
228     __jlog_munmap_reader(ctx);
229     dst = invalid[0].start;
230     for (i = 0; i < invalid_count - 1; ) {
231       src = invalid[i].end;
232       len = invalid[++i].start - src;
233       MOVE_SEGMENT;
234     }
235     src = invalid[invalid_count - 1].end;
236     len = orig_len - src;
237     if (len > 0) MOVE_SEGMENT;
238     if (!jlog_file_truncate(ctx->data, dst))
239       SYS_FAIL(JLOG_ERR_FILE_WRITE);
240   }
241
242 #undef MOVE_SEGMENT
243
244 finish:
245   jlog_file_unlock(ctx->data);
246   if (invalid) free(invalid);
247   if (ctx->last_error != JLOG_ERR_SUCCESS) return -1;
248   return invalid_count;
249 }
250
251 int jlog_inspect_datafile(jlog_ctx *ctx, u_int32_t log, int verbose)
252 {
253   jlog_message_header_compressed hdr;
254   size_t hdr_size = sizeof(jlog_message_header);
255   uint32_t *message_disk_len = &hdr.mlen;
256   char *this, *next, *mmap_end;
257   int i;
258   time_t timet;
259   struct tm tm;
260   char tbuff[128];
261
262   if (IS_COMPRESS_MAGIC(ctx)) {
263     hdr_size = sizeof(jlog_message_header_compressed);
264     message_disk_len = &hdr.compressed_len;
265   }
266
267   ctx->last_error = JLOG_ERR_SUCCESS;
268
269   __jlog_open_reader(ctx, log);
270   if (!ctx->data)
271     SYS_FAIL(JLOG_ERR_FILE_OPEN);
272   if (__jlog_mmap_reader(ctx, log) != 0)
273     SYS_FAIL(JLOG_ERR_FILE_READ);
274
275   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
276   this = ctx->mmap_base;
277   i = 0;
278   while (this + hdr_size <= mmap_end) {
279     int initial = 1;
280     memcpy(&hdr, this, hdr_size);
281     i++;
282     if (hdr.reserved != ctx->meta->hdr_magic) {
283       fprintf(stderr, "Message %d at [%ld] has invalid reserved value %u\n",
284               i, (long int)(this - (char *)ctx->mmap_base), hdr.reserved);
285       return 1;
286     }
287
288 #define PRINTMSGHDR do { if(initial) { \
289   fprintf(stderr, "Message %d at [%ld] of (%lu+%u)", i, \
290           (long int)(this - (char *)ctx->mmap_base), \
291           (long unsigned int)hdr_size, *message_disk_len); \
292   initial = 0; \
293 } } while(0)
294
295     if(verbose) {
296       PRINTMSGHDR;
297     }
298
299     next = this + hdr_size + *message_disk_len;
300     if (next <= (char *)ctx->mmap_base) {
301       PRINTMSGHDR;
302       fprintf(stderr, " WRAPPED TO NEGATIVE OFFSET!\n");
303       return 1;
304     }
305     if (next > mmap_end) {
306       PRINTMSGHDR;
307       fprintf(stderr, " OFF THE END!\n");
308       return 1;
309     }
310
311     timet = hdr.tv_sec;
312     localtime_r(&timet, &tm);
313     strftime(tbuff, sizeof(tbuff), "%c", &tm);
314     if(verbose) fprintf(stderr, "\n\ttime: %s\n\tmlen: %u\n", tbuff, hdr.mlen);
315     this = next;
316   }
317   if (this < mmap_end) {
318     fprintf(stderr, "%ld bytes of junk at the end\n",
319             (long int)(mmap_end - this));
320     return 1;
321   }
322
323   return 0;
324 finish:
325   return -1;
326 }
327
328 int jlog_idx_details(jlog_ctx *ctx, u_int32_t log,
329                      u_int32_t *marker, int *closed)
330 {
331   off_t index_len;
332   u_int64_t index;
333
334   __jlog_open_indexer(ctx, log);
335   if (!ctx->index)
336     SYS_FAIL(JLOG_ERR_IDX_OPEN);
337   if ((index_len = jlog_file_size(ctx->index)) == -1)
338     SYS_FAIL(JLOG_ERR_IDX_SEEK);
339   if (index_len % sizeof(u_int64_t))
340     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
341   if (index_len > sizeof(u_int64_t)) {
342     if (!jlog_file_pread(ctx->index, &index, sizeof(u_int64_t),
343                          index_len - sizeof(u_int64_t)))
344     {
345       SYS_FAIL(JLOG_ERR_IDX_READ);
346     }
347     if (index) {
348       *marker = index_len / sizeof(u_int64_t);
349       *closed = 0;
350     } else {
351       *marker = (index_len / sizeof(u_int64_t)) - 1;
352       *closed = 1;
353     }
354   } else {
355     *marker = index_len / sizeof(u_int64_t);
356     *closed = 0;
357   }
358
359   return 0;
360 finish:
361   return -1;
362 }
363
364 static int __jlog_unlink_datafile(jlog_ctx *ctx, u_int32_t log) {
365   char file[MAXPATHLEN];
366   int len;
367
368   memset(file, 0, sizeof(file));
369   if(ctx->current_log == log) {
370     __jlog_close_reader(ctx);
371     __jlog_close_indexer(ctx);
372   }
373
374   STRSETDATAFILE(ctx, file, log);
375 #ifdef DEBUG
376   fprintf(stderr, "unlinking %s\n", file);
377 #endif
378   unlink(file);
379
380   len = strlen(file);
381   if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
382   memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
383 #ifdef DEBUG
384   fprintf(stderr, "unlinking %s\n", file);
385 #endif
386   unlink(file);
387   return 0;
388 }
389
390 static int __jlog_open_metastore(jlog_ctx *ctx)
391 {
392   char file[MAXPATHLEN];
393   int len;
394
395   memset(file, 0, sizeof(file));
396 #ifdef DEBUG
397   fprintf(stderr, "__jlog_open_metastore\n");
398 #endif
399   len = strlen(ctx->path);
400   if((len + 1 /* IFS_CH */ + 9 /* "metastore" */ + 1) > MAXPATHLEN) {
401 #ifdef ENAMETOOLONG
402     ctx->last_errno = ENAMETOOLONG;
403 #endif
404     FASSERT(0, "__jlog_open_metastore: filename too long");
405     ctx->last_error = JLOG_ERR_CREATE_META;
406     return -1;
407   }
408   memcpy(file, ctx->path, len);
409   file[len++] = IFS_CH;
410   memcpy(&file[len], "metastore", 10); /* "metastore" + '\0' */
411
412   ctx->metastore = jlog_file_open(file, O_CREAT, ctx->file_mode, ctx->multi_process);
413
414   if (!ctx->metastore) {
415     ctx->last_errno = errno;
416     FASSERT(0, "__jlog_open_metastore: file create failed");
417     ctx->last_error = JLOG_ERR_CREATE_META;
418     return -1;
419   }
420
421   return 0;
422 }
423
424 static int __jlog_open_pre_commit(jlog_ctx *ctx)
425 {
426   char file[MAXPATHLEN] = {0};
427   int len = 0;
428
429 #ifdef DEBUG
430   fprintf(stderr, "__jlog_open_pre_commit\n");
431 #endif
432   len = strlen(ctx->path);
433   if((len + 1 /* IFS_CH */ + 10 /* "pre_commit" */ + 1) > MAXPATHLEN) {
434 #ifdef ENAMETOOLONG
435     ctx->last_errno = ENAMETOOLONG;
436 #endif
437     FASSERT(0, "__jlog_open_pre_commit: filename too long");
438     ctx->last_error = JLOG_ERR_CREATE_PRE_COMMIT;
439     return -1;
440   }
441   memcpy(file, ctx->path, len);
442   file[len++] = IFS_CH;
443   memcpy(&file[len], "pre_commit", 11); /* "pre_commit" + '\0' */
444
445   ctx->pre_commit = jlog_file_open(file, O_CREAT, ctx->file_mode, ctx->multi_process);
446
447   if (!ctx->pre_commit) {
448     ctx->last_errno = errno;
449     FASSERT(0, "__jlog_open_pre_commit: file create failed");
450     ctx->last_error = JLOG_ERR_CREATE_PRE_COMMIT;
451     return -1;
452   }
453
454   return 0;
455 }
456
457
458 /* exported */
459 int __jlog_pending_readers(jlog_ctx *ctx, u_int32_t log) {
460   return jlog_pending_readers(ctx, log, NULL);
461 }
462 int jlog_pending_readers(jlog_ctx *ctx, u_int32_t log,
463                          u_int32_t *earliest_out) {
464   int readers;
465   DIR *dir;
466   struct dirent *ent;
467   char file[MAXPATHLEN];
468   int len, seen = 0;
469   u_int32_t earliest = 0;
470   jlog_id id;
471
472   memset(file, 0, sizeof(file));
473   readers = 0;
474
475   dir = opendir(ctx->path);
476   if (!dir) return -1;
477
478   len = strlen(ctx->path);
479   if(len + 2 > sizeof(file)) {
480     closedir(dir);
481     return -1;
482   }
483   memcpy(file, ctx->path, len);
484   file[len++] = IFS_CH;
485   file[len] = '\0';
486
487   while ((ent = readdir(dir))) {
488     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
489       jlog_file *cp;
490       int dlen;
491
492       dlen = strlen(ent->d_name);
493       if((len + dlen + 1) > sizeof(file)) continue;
494       memcpy(file + len, ent->d_name, dlen + 1); /* include \0 */
495 #ifdef DEBUG
496       fprintf(stderr, "Checking if %s needs %s...\n", ent->d_name, ctx->path);
497 #endif
498       if ((cp = jlog_file_open(file, 0, ctx->file_mode, ctx->multi_process))) {
499         if (jlog_file_lock(cp)) {
500           (void) jlog_file_pread(cp, &id, sizeof(id), 0);
501 #ifdef DEBUG
502           fprintf(stderr, "\t%u <= %u (pending reader)\n", id.log, log);
503 #endif
504           if (!seen) {
505             earliest = id.log;
506             seen = 1;
507           }
508           else {
509             if(id.log < earliest) {
510               earliest = id.log;
511             }
512           }
513           if (id.log <= log) {
514             readers++;
515           }
516           jlog_file_unlock(cp);
517         }
518         jlog_file_close(cp);
519       }
520     }
521   }
522   closedir(dir);
523   if(earliest_out) *earliest_out = earliest;
524   return readers;
525 }
526 struct _jlog_subs {
527   char **subs;
528   int used;
529   int allocd;
530 };
531
532 int jlog_ctx_list_subscribers_dispose(jlog_ctx *ctx, char **subs) {
533   char *s;
534   int i = 0;
535   if(subs) {
536     while((s = subs[i++]) != NULL) free(s);
537     free(subs);
538   }
539   return 0;
540 }
541
542 int jlog_ctx_list_subscribers(jlog_ctx *ctx, char ***subs) {
543   struct _jlog_subs js = { NULL, 0, 0 };
544   DIR *dir;
545   struct dirent *ent;
546   unsigned char file[MAXPATHLEN];
547   char *p;
548   int len;
549
550   memset(file, 0, sizeof(file));
551   js.subs = calloc(16, sizeof(char *));
552   js.allocd = 16;
553
554   dir = opendir(ctx->path);
555   if (!dir) return -1;
556   while ((ent = readdir(dir))) {
557     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
558
559       for (len = 0, p = ent->d_name + 3; *p;) {
560         unsigned char c;
561         int i;
562
563         for (c = 0, i = 0; i < 16; i++) {
564           if (__jlog_hexchars[i] == *p) {
565             c = i << 4;
566             break;
567           }
568         }
569         p++;
570         for (i = 0; i < 16; i++) {
571           if (__jlog_hexchars[i] == *p) {
572             c |= i;
573             break;
574           }
575         }
576         p++;
577         file[len++] = c;
578       }
579       file[len] = '\0';
580
581       js.subs[js.used++] = strdup((char *)file);
582       if(js.used == js.allocd) {
583         js.allocd *= 2;
584         js.subs = realloc(js.subs, js.allocd*sizeof(char *));
585       }
586       js.subs[js.used] = NULL;
587     }
588   }
589   closedir(dir);
590   *subs = js.subs;
591   return js.used;
592 }
593
594 static int __jlog_save_metastore(jlog_ctx *ctx, int ilocked)
595 {
596 #ifdef DEBUG
597   fprintf(stderr, "__jlog_save_metastore\n");
598 #endif
599
600   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
601     FASSERT(0, "__jlog_save_metastore: cannot get lock");
602     ctx->last_error = JLOG_ERR_LOCK;
603     return -1;
604   }
605
606   if(ctx->meta_is_mapped) {
607     int rv, flags = MS_INVALIDATE;
608     if(ctx->meta->safety == JLOG_SAFE) flags |= MS_SYNC;
609     rv = msync((void *)(ctx->meta), sizeof(*ctx->meta), flags);
610     FASSERT(rv >= 0, "jlog_save_metastore");
611     if (!ilocked) jlog_file_unlock(ctx->metastore);
612     if ( rv < 0 )
613       ctx->last_error = JLOG_ERR_FILE_WRITE;
614     return rv;
615   }
616   else {
617     if (!jlog_file_pwrite(ctx->metastore, ctx->meta, sizeof(*ctx->meta), 0)) {
618       if (!ilocked) jlog_file_unlock(ctx->metastore);
619       FASSERT(0, "jlog_file_pwrite failed");
620       ctx->last_error = JLOG_ERR_FILE_WRITE;
621       return -1;
622     }
623     if (ctx->meta->safety == JLOG_SAFE) {
624       jlog_file_sync(ctx->metastore);
625     }
626   }
627
628   if (!ilocked) jlog_file_unlock(ctx->metastore);
629   return 0;
630 }
631
632 static int __jlog_restore_metastore(jlog_ctx *ctx, int ilocked)
633 {
634   void *base = NULL;
635   size_t len = 0;
636   if(ctx->meta_is_mapped) return 0;
637 #ifdef DEBUG
638   fprintf(stderr, "__jlog_restore_metastore\n");
639 #endif
640
641   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
642     FASSERT(0, "__jlog_restore_metastore: cannot get lock");
643     ctx->last_error = JLOG_ERR_LOCK;
644     return -1;
645   }
646
647   if(ctx->meta_is_mapped == 0) {
648     int rv;
649     rv = jlog_file_map_rdwr(ctx->metastore, &base, &len);
650     FASSERT(rv == 1, "jlog_file_map_rdwr");
651     if(rv != 1) {
652       if (!ilocked) jlog_file_unlock(ctx->metastore);
653       ctx->last_error = JLOG_ERR_OPEN;
654       return -1;
655     }
656     if(len == 12) {
657       /* old metastore format doesn't have the new magic hdr in it
658        * we need to extend it by four bytes, but we know the hdr was
659        * previously 0, so we write out zero.
660        */
661        u_int32_t dummy = 0;
662        jlog_file_pwrite(ctx->metastore, &dummy, sizeof(dummy), 12);
663        rv = jlog_file_map_rdwr(ctx->metastore, &base, &len);
664     }
665     FASSERT(rv == 1, "jlog_file_map_rdwr");
666     if(rv != 1 || len != sizeof(*ctx->meta)) {
667       if (!ilocked) jlog_file_unlock(ctx->metastore);
668       ctx->last_error = JLOG_ERR_OPEN;
669       return -1;
670     }
671     ctx->meta = base;
672     ctx->meta_is_mapped = 1;
673
674     if (IS_COMPRESS_MAGIC(ctx)) {
675       jlog_set_compression_provider(ctx->meta->hdr_magic & 0xFF);
676     }
677   }
678
679   if (!ilocked) jlog_file_unlock(ctx->metastore);
680
681   if(ctx->meta != &ctx->pre_init)
682     ctx->pre_init.hdr_magic = ctx->meta->hdr_magic;
683   return 0;
684 }
685
686 static int __jlog_map_pre_commit(jlog_ctx *ctx)
687 {
688   off_t pre_commit_size = 0;
689 #ifdef DEBUG
690   fprintf(stderr, "__jlog_map_pre_commit\n");
691 #endif
692
693   if(ctx->pre_commit_is_mapped == 1) {
694     return 0;
695   }
696
697   if (!jlog_file_lock(ctx->pre_commit)) {
698     FASSERT(0, "__jlog_map_pre_commit: cannot get lock");
699     ctx->last_error = JLOG_ERR_LOCK;
700     return -1;
701   }
702
703   pre_commit_size = jlog_file_size(ctx->pre_commit);
704   if (pre_commit_size == 0) {
705     /* fill the pre_commit file with zero'd memory to hold incoming messages for block writes */
706     /* add space for the offset in the file at the front of the buffer */
707     char *space = calloc(1, ctx->pre_commit_buffer_len + sizeof(uint32_t));
708     if (!jlog_file_pwrite(ctx->pre_commit, space, ctx->pre_commit_buffer_len + sizeof(uint32_t), 0)) {
709       jlog_file_unlock(ctx->pre_commit);
710       FASSERT(0, "jlog_file_pwrite failed");
711       ctx->last_error = JLOG_ERR_FILE_WRITE;
712       free(space);
713       return -1;
714     }
715     if (ctx->meta->safety == JLOG_SAFE) {
716       jlog_file_sync(ctx->pre_commit);
717     }
718
719     pre_commit_size = jlog_file_size(ctx->pre_commit);
720     free(space);
721   }
722  
723   /* now map it */
724   if (jlog_file_map_rdwr(ctx->pre_commit, &ctx->pre_commit_buffer, &ctx->pre_commit_buffer_len) == 0) {
725       jlog_file_unlock(ctx->pre_commit);
726       FASSERT(0, "jlog_file_map_rdwr failed");
727       ctx->last_error = JLOG_ERR_PRE_COMMIT_OPEN;
728       return -1;
729   }
730
731   /* set our file pointer to the prefix space in the buffer */
732   ctx->pre_commit_pointer = (uint32_t *)(ctx->pre_commit_buffer);
733
734   /* move the writable buffer past the offset pointer */
735   ctx->pre_commit_buffer = ctx->pre_commit_buffer + sizeof(uint32_t);
736   ctx->pre_commit_end = ctx->pre_commit_buffer + ctx->pre_commit_buffer_len;
737
738   /* restore the current pos */
739   ctx->pre_commit_pos = ctx->pre_commit_buffer + *ctx->pre_commit_pointer;
740
741   /* we're good */
742   ctx->pre_commit_is_mapped = 1;
743   jlog_file_unlock(ctx->pre_commit);
744   return 0;
745 }
746
747
748 int jlog_get_checkpoint(jlog_ctx *ctx, const char *s, jlog_id *id) {
749   jlog_file *f;
750   int rv = -1;
751
752   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
753     if(!ctx->checkpoint) {
754       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
755     }
756     f = ctx->checkpoint;
757   } else
758     f = __jlog_open_named_checkpoint(ctx, s, 0);
759
760   if (f) {
761     if (jlog_file_lock(f)) {
762       if (jlog_file_pread(f, id, sizeof(*id), 0)) rv = 0;
763       jlog_file_unlock(f);
764     }
765   }
766   if (f && f != ctx->checkpoint) jlog_file_close(f);
767   return rv;
768 }
769
770 static int __jlog_set_checkpoint(jlog_ctx *ctx, const char *s, const jlog_id *id)
771 {
772   jlog_file *f;
773   int rv = -1;
774   jlog_id old_id;
775   u_int32_t log;
776
777   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
778     if(!ctx->checkpoint) {
779       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
780     }
781     f = ctx->checkpoint;
782   } else
783     f = __jlog_open_named_checkpoint(ctx, s, 0);
784
785   if(!f) return -1;
786   if (!jlog_file_lock(f))
787     goto failset;
788
789   if (jlog_file_size(f) == 0) {
790     /* we're setting it for the first time, no segments were pending on it */
791     old_id.log = id->log;
792   } else {
793     if (!jlog_file_pread(f, &old_id, sizeof(old_id), 0))
794       goto failset;
795   }
796   if (!jlog_file_pwrite(f, id, sizeof(*id), 0)) {
797     FASSERT(0, "jlog_file_pwrite failed in jlog_set_checkpoint");
798     ctx->last_error = JLOG_ERR_FILE_WRITE;
799     goto failset;
800   }
801   if (ctx->meta->safety == JLOG_SAFE) {
802     jlog_file_sync(f);
803   }
804   jlog_file_unlock(f);
805   rv = 0;
806
807   for (log = old_id.log; log < id->log; log++) {
808     if (__jlog_pending_readers(ctx, log) == 0) {
809       __jlog_unlink_datafile(ctx, log);
810     }
811   }
812
813  failset:
814   if (f && f != ctx->checkpoint) jlog_file_close(f);
815   return rv;
816 }
817
818 static int __jlog_close_metastore(jlog_ctx *ctx) {
819   if (ctx->metastore) {
820     jlog_file_close(ctx->metastore);
821     ctx->metastore = NULL;
822   }
823   if (ctx->meta_is_mapped) {
824     munmap((void *)ctx->meta, sizeof(*ctx->meta));
825     ctx->meta = &ctx->pre_init;
826     ctx->meta_is_mapped = 0;
827   }
828   return 0;
829 }
830
831 static int __jlog_close_pre_commit(jlog_ctx *ctx) {
832   if (ctx->pre_commit) {
833     jlog_file_close(ctx->pre_commit);
834     ctx->pre_commit = NULL;
835   }
836   if (ctx->pre_commit_is_mapped) {
837     munmap((void *)ctx->pre_commit_buffer, ctx->pre_commit_buffer_len);
838     ctx->pre_commit_is_mapped = 0;
839   }
840   return 0;
841 }
842
843 /* path is assumed to be MAXPATHLEN */
844 static char *compute_checkpoint_filename(jlog_ctx *ctx, const char *subscriber, char *name)
845 {
846   const char *sub;
847   int len;
848
849   /* build checkpoint filename */
850   len = strlen(ctx->path);
851   memcpy(name, ctx->path, len);
852   name[len++] = IFS_CH;
853   name[len++] = 'c';
854   name[len++] = 'p';
855   name[len++] = '.';
856   for (sub = subscriber; *sub; ) {
857     name[len++] = __jlog_hexchars[((*sub & 0xf0) >> 4)];
858     name[len++] = __jlog_hexchars[(*sub & 0x0f)];
859     sub++;
860   }
861   name[len] = '\0';
862
863 #ifdef DEBUG
864   fprintf(stderr, "checkpoint %s filename is %s\n", subscriber, name);
865 #endif
866   return name;
867 }
868
869 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags)
870 {
871   char name[MAXPATHLEN];
872   compute_checkpoint_filename(ctx, cpname, name);
873   return jlog_file_open(name, flags, ctx->file_mode, ctx->multi_process);
874 }
875
876 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log) {
877   char file[MAXPATHLEN];
878
879   memset(file, 0, sizeof(file));
880   if(ctx->current_log != log) {
881     __jlog_close_reader(ctx);
882     __jlog_close_indexer(ctx);
883   }
884   if(ctx->data) {
885     return ctx->data;
886   }
887   STRSETDATAFILE(ctx, file, log);
888 #ifdef DEBUG
889   fprintf(stderr, "opening log file[ro]: '%s'\n", file);
890 #endif
891   ctx->data = jlog_file_open(file, 0, ctx->file_mode, ctx->multi_process);
892   ctx->current_log = log;
893   return ctx->data;
894 }
895
896 static int __jlog_munmap_reader(jlog_ctx *ctx) {
897   if(ctx->mmap_base) {
898     munmap(ctx->mmap_base, ctx->mmap_len);
899     ctx->mmap_base = NULL;
900     ctx->mmap_len = 0;
901   }
902   return 0;
903 }
904
905 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log) {
906   if(ctx->current_log == log && ctx->mmap_base) return 0;
907   __jlog_open_reader(ctx, log);
908   if(!ctx->data)
909     return -1;
910   if (!jlog_file_map_read(ctx->data, &ctx->mmap_base, &ctx->mmap_len)) {
911     ctx->mmap_base = NULL;
912     ctx->last_error = JLOG_ERR_FILE_READ;
913     ctx->last_errno = errno;
914     return -1;
915   }
916   return 0;
917 }
918
919 static jlog_file *__jlog_open_writer(jlog_ctx *ctx) {
920   char file[MAXPATHLEN] = {0};
921
922   if(ctx->data) {
923     /* Still open */
924     return ctx->data;
925   }
926
927   FASSERT(ctx != NULL, "__jlog_open_writer");
928   if(!jlog_file_lock(ctx->metastore))
929     SYS_FAIL(JLOG_ERR_LOCK);
930   int x;
931   x = __jlog_restore_metastore(ctx, 1);
932   if(x) {
933     FASSERT(x == 0, "__jlog_open_writer calls jlog_restore_metastore");
934     SYS_FAIL(JLOG_ERR_META_OPEN);
935   }
936   ctx->current_log =  ctx->meta->storage_log;
937   STRSETDATAFILE(ctx, file, ctx->current_log);
938 #ifdef DEBUG
939   fprintf(stderr, "opening log file[rw]: '%s'\n", file);
940 #endif
941   ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode, ctx->multi_process);
942   FASSERT(ctx->data != NULL, "__jlog_open_writer calls jlog_file_open");
943   if ( ctx->data == NULL )
944     ctx->last_error = JLOG_ERR_FILE_OPEN;
945   else
946     ctx->last_error = JLOG_ERR_SUCCESS;
947  finish:
948   jlog_file_unlock(ctx->metastore);
949   return ctx->data;
950 }
951
952 static int __jlog_close_writer(jlog_ctx *ctx) {
953   if (ctx->data) {
954     jlog_file_close(ctx->data);
955     ctx->data = NULL;
956   }
957   return 0;
958 }
959
960 static int __jlog_close_reader(jlog_ctx *ctx) {
961   __jlog_munmap_reader(ctx);
962   if (ctx->data) {
963     jlog_file_close(ctx->data);
964     ctx->data = NULL;
965   }
966   return 0;
967 }
968
969 static int __jlog_close_checkpoint(jlog_ctx *ctx) {
970   if (ctx->checkpoint) {
971     jlog_file_close(ctx->checkpoint);
972     ctx->checkpoint = NULL;
973   }
974   return 0;
975 }
976
977 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log) {
978   char file[MAXPATHLEN];
979   int len;
980
981   memset(file, 0, sizeof(file));
982   if(ctx->current_log != log) {
983     __jlog_close_reader(ctx);
984     __jlog_close_indexer(ctx);
985   }
986   if(ctx->index) {
987     return ctx->index;
988   }
989   STRSETDATAFILE(ctx, file, log);
990
991   len = strlen(file);
992   if((len + sizeof(INDEX_EXT)) > sizeof(file)) return NULL;
993   memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
994 #ifdef DEBUG
995   fprintf(stderr, "opening index file: '%s'\n", file);
996 #endif
997   ctx->index = jlog_file_open(file, O_CREAT, ctx->file_mode, ctx->multi_process);
998   ctx->current_log = log;
999   return ctx->index;
1000 }
1001
1002 static int __jlog_close_indexer(jlog_ctx *ctx) {
1003   if (ctx->index) {
1004     jlog_file_close(ctx->index);
1005     ctx->index = NULL;
1006   }
1007   return 0;
1008 }
1009
1010 static int
1011 ___jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *closed)
1012 {
1013   u_int64_t indices[BUFFERED_INDICES];
1014   jlog_message_header_compressed logmhdr;
1015   uint32_t *message_disk_len = &logmhdr.mlen;
1016   off_t index_off, data_off, data_len;
1017   size_t hdr_size = sizeof(jlog_message_header);
1018   u_int64_t index;
1019   int i, second_try = 0;
1020
1021   if (IS_COMPRESS_MAGIC(ctx)) {
1022     hdr_size = sizeof(jlog_message_header_compressed);
1023     message_disk_len = &logmhdr.compressed_len;
1024   }
1025
1026   ctx->last_error = JLOG_ERR_SUCCESS;
1027   if(closed) *closed = 0;
1028
1029   __jlog_open_reader(ctx, log);
1030   if (!ctx->data) {
1031     ctx->last_error = JLOG_ERR_FILE_OPEN;
1032     ctx->last_errno = errno;
1033     return -1;
1034   }
1035
1036 #define RESTART do { \
1037   if (second_try == 0) { \
1038     jlog_file_truncate(ctx->index, index_off); \
1039     jlog_file_unlock(ctx->index); \
1040     second_try = 1; \
1041     ctx->last_error = JLOG_ERR_SUCCESS; \
1042     goto restart; \
1043   } \
1044   SYS_FAIL(JLOG_ERR_IDX_CORRUPT); \
1045 } while (0)
1046
1047 restart:
1048   __jlog_open_indexer(ctx, log);
1049   if (!ctx->index) {
1050     ctx->last_error = JLOG_ERR_IDX_OPEN;
1051     ctx->last_errno = errno;
1052     return -1;
1053   }
1054   if (!jlog_file_lock(ctx->index)) {
1055     ctx->last_error = JLOG_ERR_LOCK;
1056     ctx->last_errno = errno;
1057     return -1;
1058   }
1059
1060   data_off = 0;
1061   if ((data_len = jlog_file_size(ctx->data)) == -1)
1062     SYS_FAIL(JLOG_ERR_FILE_SEEK);
1063   if ((index_off = jlog_file_size(ctx->index)) == -1)
1064     SYS_FAIL(JLOG_ERR_IDX_SEEK);
1065
1066   if (index_off % sizeof(u_int64_t)) {
1067 #ifdef DEBUG
1068     fprintf(stderr, "corrupt index [%llu]\n", index_off);
1069 #endif
1070     RESTART;
1071   }
1072
1073   if (index_off > sizeof(u_int64_t)) {
1074     if (!jlog_file_pread(ctx->index, &index, sizeof(index),
1075                          index_off - sizeof(u_int64_t)))
1076     {
1077       SYS_FAIL(JLOG_ERR_IDX_READ);
1078     }
1079     if (index == 0) {
1080       /* This log file has been "closed" */
1081 #ifdef DEBUG
1082       fprintf(stderr, "index closed\n");
1083 #endif
1084       if(last) {
1085         last->log = log;
1086         last->marker = (index_off / sizeof(u_int64_t)) - 1;
1087       }
1088       if(closed) *closed = 1;
1089       goto finish;
1090     } else {
1091       if (index > data_len) {
1092 #ifdef DEBUG
1093         fprintf(stderr, "index told me to seek somehwere I can't\n");
1094 #endif
1095         RESTART;
1096       }
1097       data_off = index;
1098     }
1099   }
1100
1101   if (index_off > 0) {
1102     /* We are adding onto a partial index so we must advance a record */
1103     if (!jlog_file_pread(ctx->data, &logmhdr, hdr_size, data_off))
1104       SYS_FAIL(JLOG_ERR_FILE_READ);
1105     if ((data_off += hdr_size + *message_disk_len) > data_len)
1106       RESTART;
1107   }
1108
1109   i = 0;
1110   while (data_off + hdr_size <= data_len) {
1111     off_t next_off = data_off;
1112
1113     if (!jlog_file_pread(ctx->data, &logmhdr, hdr_size, data_off))
1114       SYS_FAIL(JLOG_ERR_FILE_READ);
1115     if (logmhdr.reserved != ctx->meta->hdr_magic) {
1116 #ifdef DEBUG
1117       fprintf(stderr, "logmhdr.reserved == %d\n", logmhdr.reserved);
1118 #endif
1119       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
1120     }
1121     if ((next_off += hdr_size + *message_disk_len) > data_len)
1122       break;
1123
1124     /* Write our new index offset */
1125     indices[i++] = data_off;
1126     if(i >= BUFFERED_INDICES) {
1127 #ifdef DEBUG
1128       fprintf(stderr, "writing %i offsets\n", i);
1129 #endif
1130       if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
1131         RESTART;
1132       index_off += i * sizeof(u_int64_t);
1133       i = 0;
1134     }
1135     data_off = next_off;
1136   }
1137   if(i > 0) {
1138 #ifdef DEBUG
1139     fprintf(stderr, "writing %i offsets\n", i);
1140 #endif
1141     if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
1142       RESTART;
1143     index_off += i * sizeof(u_int64_t);
1144   }
1145   if(last) {
1146     last->log = log;
1147     last->marker = index_off / sizeof(u_int64_t);
1148   }
1149   if(log < ctx->meta->storage_log) {
1150     if (data_off != data_len) {
1151 #ifdef DEBUG
1152       fprintf(stderr, "closing index, but %llu != %llu\n", data_off, data_len);
1153 #endif
1154       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
1155     }
1156     /* Special case: if we are closing, we next write a '0'
1157      * we can't write the closing marker if the data segment had no records
1158      * in it, since it will be confused with an index to offset 0 by the
1159      * next reader; this only happens when segments are repaired */
1160     if (index_off) {
1161       index = 0;
1162       if (!jlog_file_pwrite(ctx->index, &index, sizeof(u_int64_t), index_off))
1163         RESTART;
1164       index_off += sizeof(u_int64_t);
1165     }
1166     if(closed) *closed = 1;
1167   }
1168 #undef RESTART
1169
1170 finish:
1171   jlog_file_unlock(ctx->index);
1172 #ifdef DEBUG
1173   fprintf(stderr, "index is %s\n", closed?(*closed?"closed":"open"):"unknown");
1174 #endif
1175   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1176   return -1;
1177 }
1178
1179 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *closed) {
1180   int attempts, rv = -1;
1181   for(attempts=0; attempts<4; attempts++) {
1182     rv = ___jlog_resync_index(ctx, log, last, closed);
1183     if(ctx->last_error == JLOG_ERR_SUCCESS) break;
1184     if(ctx->last_error == JLOG_ERR_FILE_OPEN ||
1185        ctx->last_error == JLOG_ERR_IDX_OPEN) break;
1186
1187     /* We can't fix the file if someone may write to it again */
1188     if(log >= ctx->meta->storage_log) break;
1189
1190     jlog_file_lock(ctx->index);
1191     /* it doesn't really matter what jlog_repair_datafile returns
1192      * we'll keep retrying anyway */
1193     jlog_repair_datafile(ctx, log);
1194     jlog_file_truncate(ctx->index, 0);
1195     jlog_file_unlock(ctx->index);
1196   }
1197   return rv;
1198 }
1199
1200 jlog_ctx *jlog_new(const char *path) {
1201   jlog_ctx *ctx;
1202   ctx = calloc(1, sizeof(*ctx));
1203   ctx->meta = &ctx->pre_init;
1204   ctx->pre_init.unit_limit = DEFAULT_UNIT_LIMIT;
1205   ctx->pre_init.safety = DEFAULT_SAFETY;
1206   ctx->pre_init.hdr_magic = DEFAULT_HDR_MAGIC;
1207   ctx->file_mode = DEFAULT_FILE_MODE;
1208   ctx->context_mode = JLOG_NEW;
1209   ctx->path = strdup(path);
1210   ctx->pre_commit_buffer_len = PRE_COMMIT_BUFFER_SIZE_DEFAULT;
1211   ctx->multi_process = 1;
1212   pthread_mutex_init(&ctx->write_lock, NULL);
1213   //  fassertxsetpath(path);
1214   return ctx;
1215 }
1216
1217 void jlog_set_error_func(jlog_ctx *ctx, jlog_error_func Func, void *ptr) {
1218   ctx->error_func = Func;
1219   ctx->error_ctx = ptr;
1220 }
1221
1222 size_t jlog_raw_size(jlog_ctx *ctx) {
1223   DIR *d;
1224   struct dirent *de;
1225   size_t totalsize = 0;
1226   int ferr, len;
1227   char filename[MAXPATHLEN] = {0};
1228
1229   d = opendir(ctx->path);
1230   if(!d) return 0;
1231   len = strlen(ctx->path);
1232   memcpy(filename, ctx->path, len);
1233   filename[len++] = IFS_CH;
1234   while((de = readdir(d)) != NULL) {
1235     struct stat sb;
1236     int dlen;
1237
1238     dlen = strlen(de->d_name);
1239     if((len + dlen + 1) > sizeof(filename)) continue;
1240     memcpy(filename+len, de->d_name, dlen + 1); /* include \0 */
1241     while((ferr = stat(filename, &sb)) == -1 && errno == EINTR);
1242     if(ferr == 0 && S_ISREG(sb.st_mode)) totalsize += sb.st_size;
1243   }
1244   closedir(d);
1245   return totalsize;
1246 }
1247
1248 const char *jlog_ctx_err_string(jlog_ctx *ctx) {
1249   switch (ctx->last_error) {
1250 #define MSG_O_MATIC(x)  case x: return #x;
1251     MSG_O_MATIC( JLOG_ERR_SUCCESS);
1252     MSG_O_MATIC( JLOG_ERR_ILLEGAL_INIT);
1253     MSG_O_MATIC( JLOG_ERR_ILLEGAL_OPEN);
1254     MSG_O_MATIC( JLOG_ERR_OPEN);
1255     MSG_O_MATIC( JLOG_ERR_NOTDIR);
1256     MSG_O_MATIC( JLOG_ERR_CREATE_PATHLEN);
1257     MSG_O_MATIC( JLOG_ERR_CREATE_EXISTS);
1258     MSG_O_MATIC( JLOG_ERR_CREATE_MKDIR);
1259     MSG_O_MATIC( JLOG_ERR_CREATE_META);
1260     MSG_O_MATIC( JLOG_ERR_LOCK);
1261     MSG_O_MATIC( JLOG_ERR_IDX_OPEN);
1262     MSG_O_MATIC( JLOG_ERR_IDX_SEEK);
1263     MSG_O_MATIC( JLOG_ERR_IDX_CORRUPT);
1264     MSG_O_MATIC( JLOG_ERR_IDX_WRITE);
1265     MSG_O_MATIC( JLOG_ERR_IDX_READ);
1266     MSG_O_MATIC( JLOG_ERR_FILE_OPEN);
1267     MSG_O_MATIC( JLOG_ERR_FILE_SEEK);
1268     MSG_O_MATIC( JLOG_ERR_FILE_CORRUPT);
1269     MSG_O_MATIC( JLOG_ERR_FILE_READ);
1270     MSG_O_MATIC( JLOG_ERR_FILE_WRITE);
1271     MSG_O_MATIC( JLOG_ERR_META_OPEN);
1272     MSG_O_MATIC( JLOG_ERR_ILLEGAL_WRITE);
1273     MSG_O_MATIC( JLOG_ERR_ILLEGAL_CHECKPOINT);
1274     MSG_O_MATIC( JLOG_ERR_INVALID_SUBSCRIBER);
1275     MSG_O_MATIC( JLOG_ERR_ILLEGAL_LOGID);
1276     MSG_O_MATIC( JLOG_ERR_SUBSCRIBER_EXISTS);
1277     MSG_O_MATIC( JLOG_ERR_CHECKPOINT);
1278     MSG_O_MATIC( JLOG_ERR_NOT_SUPPORTED);
1279     MSG_O_MATIC( JLOG_ERR_CLOSE_LOGID);
1280     default: return "Unknown";
1281   }
1282 }
1283
1284 int jlog_ctx_err(jlog_ctx *ctx) {
1285   return ctx->last_error;
1286 }
1287
1288 int jlog_ctx_errno(jlog_ctx *ctx) {
1289   return ctx->last_errno;
1290 }
1291
1292 int jlog_ctx_alter_safety(jlog_ctx *ctx, jlog_safety safety) {
1293   if(ctx->meta->safety == safety) return 0;
1294   if(ctx->context_mode == JLOG_APPEND ||
1295      ctx->context_mode == JLOG_NEW) {
1296     ctx->meta->safety = safety;
1297     if(ctx->context_mode == JLOG_APPEND) {
1298       if(__jlog_save_metastore(ctx, 0) != 0) {
1299         FASSERT(0, "jlog_ctx_alter_safety calls jlog_save_metastore");
1300         SYS_FAIL(JLOG_ERR_CREATE_META);
1301       }
1302     }
1303     return 0;
1304   }
1305  finish:
1306   return -1;
1307 }
1308
1309 int jlog_ctx_set_multi_process(jlog_ctx *ctx, uint8_t mp) {
1310   ctx->multi_process = mp;
1311   return 0;
1312 }
1313
1314 int jlog_ctx_set_use_compression(jlog_ctx *ctx, uint8_t use) {
1315   if (use != 0) {
1316     ctx->pre_init.hdr_magic = DEFAULT_HDR_MAGIC_COMPRESSION | JLOG_COMPRESSION_LZ4;
1317     jlog_set_compression_provider(JLOG_COMPRESSION_LZ4);
1318   } else {
1319     ctx->pre_init.hdr_magic = DEFAULT_HDR_MAGIC;
1320   }   
1321   return 0;
1322 }
1323
1324 int jlog_ctx_set_compression_provider(jlog_ctx *ctx, jlog_compression_provider_choice cp) {
1325   if ((ctx->pre_init.hdr_magic & DEFAULT_HDR_MAGIC_COMPRESSION) == DEFAULT_HDR_MAGIC_COMPRESSION) {
1326     /* compression mode is on, set the proper flag */
1327     ctx->pre_init.hdr_magic = DEFAULT_HDR_MAGIC_COMPRESSION | cp;
1328     jlog_set_compression_provider(cp);
1329   }
1330   return 0;
1331 }
1332
1333 int jlog_ctx_set_pre_commit_buffer_size(jlog_ctx *ctx, size_t s) {
1334   ctx->pre_commit_buffer_len = s;
1335   return 0;
1336 }
1337
1338 int jlog_ctx_flush_pre_commit_buffer(jlog_ctx *ctx)
1339 {
1340   off_t current_offset = 0;
1341
1342   /* noop if we aren't using a pre_commit buffer */
1343   if (ctx->pre_commit_buffer_len == 0) {
1344     return 0;
1345   }
1346
1347   ctx->last_error = JLOG_ERR_SUCCESS;
1348   if(ctx->context_mode != JLOG_APPEND) {
1349     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1350     ctx->last_errno = EPERM;
1351     return -1;
1352   }
1353   pthread_mutex_lock(&ctx->write_lock);
1354  begin:
1355   __jlog_open_writer(ctx);
1356   if(!ctx->data) {
1357     ctx->last_error = JLOG_ERR_FILE_OPEN;
1358     ctx->last_errno = errno;
1359     pthread_mutex_unlock(&ctx->write_lock);
1360     return -1;
1361   }
1362
1363   if (!jlog_file_lock(ctx->data)) {
1364     ctx->last_error = JLOG_ERR_LOCK;
1365     ctx->last_errno = errno;
1366     pthread_mutex_unlock(&ctx->write_lock);
1367     return -1;
1368   }
1369
1370   if ((current_offset = jlog_file_size(ctx->data)) == -1)
1371     SYS_FAIL(JLOG_ERR_FILE_SEEK);
1372   if(ctx->meta->unit_limit <= current_offset) {
1373     jlog_file_unlock(ctx->data);
1374     __jlog_close_writer(ctx);
1375     __jlog_metastore_atomic_increment(ctx);
1376     goto begin;
1377   }
1378
1379   /* we have to flush our pre_commit_buffer out to the real log */
1380   if (!jlog_file_pwrite(ctx->data, ctx->pre_commit_buffer,
1381                         ctx->pre_commit_pos - ctx->pre_commit_buffer,
1382                         current_offset)) {
1383     FASSERT(0, "jlog_file_pwrite failed in jlog_ctx_write_message");
1384     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1385   }
1386
1387   current_offset += ctx->pre_commit_pos - ctx->pre_commit_buffer;
1388
1389   /* rewind the pre_commit_buffer to beginning */
1390   ctx->pre_commit_pos = ctx->pre_commit_buffer;
1391   /* ensure we save this in the mmapped data */
1392   *ctx->pre_commit_pointer = 0;
1393
1394   if(ctx->meta->unit_limit <= current_offset) {
1395     jlog_file_unlock(ctx->data);
1396     __jlog_close_writer(ctx);
1397     __jlog_metastore_atomic_increment(ctx);
1398     pthread_mutex_unlock(&ctx->write_lock);
1399     return 0;
1400   }
1401  finish:
1402   jlog_file_unlock(ctx->data);
1403   pthread_mutex_unlock(&ctx->write_lock);
1404   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1405   return -1;
1406 }
1407
1408
1409 int jlog_ctx_alter_journal_size(jlog_ctx *ctx, size_t size) {
1410   if(ctx->meta->unit_limit == size) return 0;
1411   if(ctx->context_mode == JLOG_APPEND ||
1412      ctx->context_mode == JLOG_NEW) {
1413     ctx->meta->unit_limit = size;
1414     if(ctx->context_mode == JLOG_APPEND) {
1415       if(__jlog_save_metastore(ctx, 0) != 0) {
1416         FASSERT(0, "jlog_ctx_alter_journal_size calls jlog_save_metastore");
1417         SYS_FAIL(JLOG_ERR_CREATE_META);
1418       }
1419     }
1420     return 0;
1421   }
1422  finish:
1423   return -1;
1424 }
1425 int jlog_ctx_alter_mode(jlog_ctx *ctx, int mode) {
1426   ctx->file_mode = mode;
1427   return 0;
1428 }
1429 int jlog_ctx_open_writer(jlog_ctx *ctx) {
1430   int rv;
1431   struct stat sb;
1432
1433   pthread_mutex_lock(&ctx->write_lock);
1434   ctx->last_error = JLOG_ERR_SUCCESS;
1435   if(ctx->context_mode != JLOG_NEW) {
1436     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1437     pthread_mutex_unlock(&ctx->write_lock);
1438     return -1;
1439   }
1440   ctx->context_mode = JLOG_APPEND;
1441   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1442   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1443   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1444   FASSERT(ctx != NULL, "jlog_ctx_open_writer");
1445   if(__jlog_open_metastore(ctx) != 0) {
1446     FASSERT(0, "jlog_ctx_open_writer calls jlog_open_metastore");
1447     SYS_FAIL(JLOG_ERR_META_OPEN);
1448   }
1449   if(__jlog_restore_metastore(ctx, 0)) {
1450     FASSERT(0, "jlog_ctx_open_writer calls jlog_restore_metastore");
1451     SYS_FAIL(JLOG_ERR_META_OPEN);
1452   }
1453   if (__jlog_open_pre_commit(ctx) != 0) {
1454     FASSERT(0, "jlog_ctx_open_writer calls jlog_open_pre_commit");
1455     SYS_FAIL(JLOG_ERR_PRE_COMMIT_OPEN);
1456   }
1457
1458   if (__jlog_map_pre_commit(ctx) != 0) {
1459     FASSERT(0, "jlog_ctx_open_writer calls jlog_map_pre_commit");
1460     SYS_FAIL(JLOG_ERR_PRE_COMMIT_OPEN);
1461   }
1462    
1463  finish:
1464   pthread_mutex_unlock(&ctx->write_lock);
1465   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1466   ctx->context_mode = JLOG_INVALID;
1467   return -1;
1468 }
1469 int jlog_ctx_open_reader(jlog_ctx *ctx, const char *subscriber) {
1470   int rv;
1471   struct stat sb;
1472   jlog_id dummy;
1473
1474   ctx->last_error = JLOG_ERR_SUCCESS;
1475   if(ctx->context_mode != JLOG_NEW) {
1476     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1477     return -1;
1478   }
1479   ctx->context_mode = JLOG_READ;
1480   ctx->subscriber_name = strdup(subscriber);
1481   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1482   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1483   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1484   FASSERT(ctx != NULL, "__jlog_ctx_open_reader");
1485   if(__jlog_open_metastore(ctx) != 0) {
1486     FASSERT(0, "jlog_ctx_open_reader calls jlog_open_metastore");
1487     SYS_FAIL(JLOG_ERR_META_OPEN);
1488   }
1489   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &dummy))
1490     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1491   if(__jlog_restore_metastore(ctx, 0)) {
1492     FASSERT(0, "jlog_ctx_open_reader calls jlog_restore_metastore");
1493     SYS_FAIL(JLOG_ERR_META_OPEN);
1494   }
1495  finish:
1496   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1497   ctx->context_mode = JLOG_INVALID;
1498   return -1;
1499 }
1500
1501 int jlog_ctx_init(jlog_ctx *ctx) {
1502   int rv;
1503   struct stat sb;
1504   int dirmode;
1505
1506   ctx->multi_process = 1;
1507   ctx->last_error = JLOG_ERR_SUCCESS;
1508   if(strlen(ctx->path) > MAXLOGPATHLEN-1) {
1509     ctx->last_error = JLOG_ERR_CREATE_PATHLEN;
1510     return -1;
1511   }
1512   if(ctx->context_mode != JLOG_NEW) {
1513     ctx->last_error = JLOG_ERR_ILLEGAL_INIT;
1514     return -1;
1515   }
1516   ctx->context_mode = JLOG_INIT;
1517   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1518   if(rv == 0 || errno != ENOENT) {
1519     SYS_FAIL_EX(JLOG_ERR_CREATE_EXISTS, 0);
1520   }
1521   dirmode = ctx->file_mode;
1522   if(dirmode & 0400) dirmode |= 0100;
1523   if(dirmode & 040) dirmode |= 010;
1524   if(dirmode & 04) dirmode |= 01;
1525   if(mkdir(ctx->path, dirmode) == -1)
1526     SYS_FAIL(JLOG_ERR_CREATE_MKDIR);
1527   chmod(ctx->path, dirmode);
1528   // fassertxsetpath(ctx->path);
1529   /* Setup our initial state and store our instance metadata */
1530   if(__jlog_open_metastore(ctx) != 0) {
1531     FASSERT(0, "jlog_ctx_init calls jlog_open_metastore");
1532     SYS_FAIL(JLOG_ERR_CREATE_META);
1533   }
1534   if(__jlog_save_metastore(ctx, 0) != 0) {
1535     FASSERT(0, "jlog_ctx_init calls jlog_save_metastore");
1536     SYS_FAIL(JLOG_ERR_CREATE_META);
1537   }
1538   //  FASSERT(0, "Start of fassert log");
1539  finish:
1540   FASSERT(ctx->last_error == JLOG_ERR_SUCCESS, "jlog_ctx_init failed");
1541   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1542   return -1;
1543 }
1544
1545 int jlog_ctx_close(jlog_ctx *ctx) {
1546   jlog_ctx_flush_pre_commit_buffer(ctx);
1547   __jlog_close_writer(ctx);
1548   __jlog_close_pre_commit(ctx);
1549   __jlog_close_indexer(ctx);
1550   __jlog_close_reader(ctx);
1551   __jlog_close_metastore(ctx);
1552   __jlog_close_checkpoint(ctx);
1553   if(ctx->subscriber_name) free(ctx->subscriber_name);
1554   if(ctx->path) free(ctx->path);
1555   free(ctx);
1556   return 0;
1557 }
1558
1559 static int __jlog_metastore_atomic_increment(jlog_ctx *ctx) {
1560   char file[MAXPATHLEN] = {0};
1561
1562 #ifdef DEBUG
1563   fprintf(stderr, "atomic increment on %u\n", ctx->current_log);
1564 #endif
1565   FASSERT(ctx != NULL, "__jlog_metastore_atomic_increment");
1566   if(ctx->data) SYS_FAIL(JLOG_ERR_NOT_SUPPORTED);
1567   if (!jlog_file_lock(ctx->metastore))
1568     SYS_FAIL(JLOG_ERR_LOCK);
1569   if(__jlog_restore_metastore(ctx, 1)) {
1570     FASSERT(0,
1571             "jlog_metastore_atomic_increment calls jlog_restore_metastore");
1572     SYS_FAIL(JLOG_ERR_META_OPEN);
1573   }
1574   if(ctx->meta->storage_log == ctx->current_log) {
1575     /* We're the first ones to it, so we get to increment it */
1576     ctx->current_log++;
1577     STRSETDATAFILE(ctx, file, ctx->current_log);
1578     ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode, ctx->multi_process);
1579     ctx->meta->storage_log = ctx->current_log;
1580     if(__jlog_save_metastore(ctx, 1)) {
1581       FASSERT(0,
1582               "jlog_metastore_atomic_increment calls jlog_save_metastore");
1583       SYS_FAIL(JLOG_ERR_META_OPEN);
1584     }
1585   }
1586  finish:
1587   jlog_file_unlock(ctx->metastore);
1588   /* Now we update our curent_log to the current storage_log,
1589    * it may have advanced farther than we know.
1590    */
1591   ctx->current_log = ctx->meta->storage_log;
1592   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1593   return -1;
1594 }
1595
1596 int jlog_ctx_write_message(jlog_ctx *ctx, jlog_message *mess, struct timeval *when) {
1597   struct timeval now;
1598   jlog_message_header_compressed hdr;
1599   off_t current_offset = 0;
1600   size_t hdr_size = sizeof(jlog_message_header);
1601   int i = 0;
1602
1603   if (IS_COMPRESS_MAGIC(ctx)) {
1604     hdr_size = sizeof(jlog_message_header_compressed);
1605   }
1606
1607   ctx->last_error = JLOG_ERR_SUCCESS;
1608   if(ctx->context_mode != JLOG_APPEND) {
1609     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1610     ctx->last_errno = EPERM;
1611     return -1;
1612   }
1613
1614   /* build the data we want to write outside of any lock */
1615   hdr.reserved = ctx->meta->hdr_magic;
1616   if (when) {
1617     hdr.tv_sec = when->tv_sec;
1618     hdr.tv_usec = when->tv_usec;
1619   } else {
1620     gettimeofday(&now, NULL);
1621     hdr.tv_sec = now.tv_sec;
1622     hdr.tv_usec = now.tv_usec;
1623   }
1624
1625   /* we store the original message size in the header */
1626   hdr.mlen = mess->mess_len;
1627
1628   struct iovec v[2];
1629   v[0].iov_base = (void *) &hdr;
1630   v[0].iov_len = hdr_size;
1631
1632   /* create a stack space to compress into which is large enough for most messages to compress into */
1633   char compress_space[16384] = {0};
1634   v[1].iov_base = compress_space;
1635   size_t compressed_len = sizeof(compress_space);
1636
1637   if (IS_COMPRESS_MAGIC(ctx)) {
1638     if (jlog_compress(mess->mess, mess->mess_len, (char **)&v[1].iov_base, &compressed_len) != 0) {
1639       FASSERT(0, "jlog_compress failed in jlog_ctx_write_message");
1640       SYS_FAIL(JLOG_ERR_FILE_WRITE);
1641     }
1642     hdr.compressed_len = compressed_len;
1643     v[1].iov_len = hdr.compressed_len;
1644   } else {
1645     v[1].iov_base = mess->mess;
1646     v[1].iov_len = mess->mess_len;
1647   }
1648
1649   size_t total_size = v[0].iov_len + v[1].iov_len;
1650
1651   /* now grab the file lock and write to pre_commit or file depending */
1652   /**
1653    * this needs to be synchronized as concurrent writers can
1654    * overwrite the shared ctx->data pointer as they move through
1655    * individual file segments.
1656    *
1657    * Thread A-> open, write to existing segment,
1658    * Thread B-> check open (already open)
1659    * Thread A-> close and null out ctx->data pointer
1660    * Thread B-> wha?!?
1661    */
1662   pthread_mutex_lock(&ctx->write_lock);
1663  begin:
1664   __jlog_open_writer(ctx);
1665   if(!ctx->data) {
1666     ctx->last_error = JLOG_ERR_FILE_OPEN;
1667     ctx->last_errno = errno;
1668     pthread_mutex_unlock(&ctx->write_lock);
1669     return -1;
1670   }
1671
1672   if (!jlog_file_lock(ctx->data)) {
1673     ctx->last_error = JLOG_ERR_LOCK;
1674     ctx->last_errno = errno;
1675     pthread_mutex_unlock(&ctx->write_lock);
1676     return -1;
1677   }
1678
1679   if (ctx->pre_commit_buffer_len > 0 &&
1680       ctx->pre_commit_pos + total_size > ctx->pre_commit_end)
1681     {
1682     if ((current_offset = jlog_file_size(ctx->data)) == -1)
1683       SYS_FAIL(JLOG_ERR_FILE_SEEK);
1684     if(ctx->meta->unit_limit <= current_offset) {
1685       jlog_file_unlock(ctx->data);
1686       __jlog_close_writer(ctx);
1687       __jlog_metastore_atomic_increment(ctx);
1688       if (IS_COMPRESS_MAGIC(ctx) && v[1].iov_base != compress_space) {
1689         free(v[1].iov_base);
1690       }
1691       goto begin;
1692     }
1693
1694     /* we have to flush our pre_commit_buffer out to the real log */
1695     if (!jlog_file_pwrite(ctx->data, ctx->pre_commit_buffer,
1696                           ctx->pre_commit_pos - ctx->pre_commit_buffer,
1697                           current_offset)) {
1698       FASSERT(0, "jlog_file_pwrite failed in jlog_ctx_write_message");
1699       SYS_FAIL(JLOG_ERR_FILE_WRITE);
1700     }
1701
1702     /* rewind the pre_commit_buffer to beginning */
1703     ctx->pre_commit_pos = ctx->pre_commit_buffer;
1704     /* ensure we save this in the mmapped data */
1705     *ctx->pre_commit_pointer = 0;
1706   }
1707  
1708   if (total_size <= ctx->pre_commit_buffer_len) {
1709     /**
1710      * Write the iovecs to the pre-commit buffer
1711      *
1712      * This is protected by the file lock on the main data file so needs no special treatment
1713      */
1714     for (i = 0; i < 2; i++) {
1715       memcpy(ctx->pre_commit_pos, v[i].iov_base, v[i].iov_len);
1716       ctx->pre_commit_pos += v[i].iov_len;
1717       *ctx->pre_commit_pointer += v[i].iov_len;
1718     }
1719   } else {
1720     /* incoming message won't fit in pre_commit buffer, write directly */
1721     if (!jlog_file_pwritev(ctx->data, v, 2, current_offset)) {
1722       FASSERT(0, "jlog_file_pwritev failed in jlog_ctx_write_message");
1723       SYS_FAIL(JLOG_ERR_FILE_WRITE);
1724     }
1725   }
1726
1727   current_offset += v[0].iov_len + v[1].iov_len;
1728  
1729   if (IS_COMPRESS_MAGIC(ctx) && v[1].iov_base != compress_space) {
1730     free(v[1].iov_base);
1731   }
1732
1733   if(ctx->meta->unit_limit <= current_offset) {
1734     jlog_file_unlock(ctx->data);
1735     __jlog_close_writer(ctx);
1736     __jlog_metastore_atomic_increment(ctx);
1737     pthread_mutex_unlock(&ctx->write_lock);
1738     return 0;
1739   }
1740  finish:
1741   jlog_file_unlock(ctx->data);
1742   pthread_mutex_unlock(&ctx->write_lock);
1743   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1744   return -1;
1745 }
1746
1747 int jlog_ctx_read_checkpoint(jlog_ctx *ctx, const jlog_id *chkpt) {
1748   ctx->last_error = JLOG_ERR_SUCCESS;
1749  
1750   if(ctx->context_mode != JLOG_READ) {
1751     ctx->last_error = JLOG_ERR_ILLEGAL_CHECKPOINT;
1752     ctx->last_errno = EPERM;
1753     return -1;
1754   }
1755   if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, chkpt) != 0) {
1756     ctx->last_error = JLOG_ERR_CHECKPOINT;
1757     ctx->last_errno = 0;
1758     return -1;
1759   }
1760   return 0;
1761 }
1762
1763 int jlog_ctx_remove_subscriber(jlog_ctx *ctx, const char *s) {
1764   char name[MAXPATHLEN];
1765   int rv;
1766
1767   compute_checkpoint_filename(ctx, s, name);
1768   rv = unlink(name);
1769
1770   if (rv == 0) {
1771     ctx->last_error = JLOG_ERR_SUCCESS;
1772     return 1;
1773   }
1774   if (errno == ENOENT) {
1775     ctx->last_error = JLOG_ERR_INVALID_SUBSCRIBER;
1776     return 0;
1777   }
1778   return -1;
1779 }
1780
1781 int jlog_ctx_add_subscriber(jlog_ctx *ctx, const char *s, jlog_position whence) {
1782   jlog_id chkpt;
1783   jlog_ctx *tmpctx = NULL;
1784   jlog_file *jchkpt;
1785   ctx->last_error = JLOG_ERR_SUCCESS;
1786
1787   jchkpt = __jlog_open_named_checkpoint(ctx, s, O_CREAT|O_EXCL);
1788   if(!jchkpt) {
1789     ctx->last_errno = errno;
1790     if(errno == EEXIST)
1791       ctx->last_error = JLOG_ERR_SUBSCRIBER_EXISTS;
1792     else
1793       ctx->last_error = JLOG_ERR_OPEN;
1794     return -1;
1795   }
1796   jlog_file_close(jchkpt);
1797  
1798   if(whence == JLOG_BEGIN) {
1799     memset(&chkpt, 0, sizeof(chkpt));
1800     jlog_ctx_first_log_id(ctx, &chkpt);
1801     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0) {
1802       ctx->last_error = JLOG_ERR_CHECKPOINT;
1803       ctx->last_errno = 0;
1804       return -1;
1805     }
1806     return 0;
1807   }
1808   if(whence == JLOG_END) {
1809     jlog_id start, finish;
1810     memset(&chkpt, 0, sizeof(chkpt));
1811     FASSERT(ctx != NULL, "__jlog_ctx_add_subscriber");
1812     if(__jlog_open_metastore(ctx) != 0) {
1813       FASSERT(0, "jlog_ctx_add_subscriber calls jlog_open_metastore");
1814       SYS_FAIL(JLOG_ERR_META_OPEN);
1815     }
1816     if(__jlog_restore_metastore(ctx, 0)) {
1817       FASSERT(0, "jlog_ctx_add_subscriber calls jlog_restore_metastore");
1818       SYS_FAIL(JLOG_ERR_META_OPEN);
1819     }
1820     chkpt.log = ctx->meta->storage_log;
1821     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0)
1822       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1823     tmpctx = jlog_new(ctx->path);
1824     if(jlog_ctx_open_reader(tmpctx, s) < 0) goto finish;
1825     if(jlog_ctx_read_interval(tmpctx, &start, &finish) < 0) goto finish;
1826     jlog_ctx_close(tmpctx);
1827     tmpctx = NULL;
1828     if(__jlog_set_checkpoint(ctx, s, &finish) != 0)
1829       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1830     return 0;
1831   }
1832   ctx->last_error = JLOG_ERR_NOT_SUPPORTED;
1833  finish:
1834   if(tmpctx) jlog_ctx_close(tmpctx);
1835   return -1;
1836 }
1837
1838 int jlog_ctx_add_subscriber_copy_checkpoint(jlog_ctx *old_ctx, const char *new,
1839                                 const char *old) {
1840   jlog_id chkpt;
1841   jlog_ctx *new_ctx = NULL;;
1842
1843   /* If there's no old checkpoint available, just return */
1844   if (jlog_get_checkpoint(old_ctx, old, &chkpt)) {
1845     return -1;
1846   }
1847
1848   /* If we can't open the jlog_ctx, just return */
1849   new_ctx = jlog_new(old_ctx->path);
1850   if (!new_ctx) {
1851     return -1;
1852   }
1853   if (jlog_ctx_add_subscriber(new_ctx, new, JLOG_BEGIN)) {
1854     /* If it already exists, we want to overwrite it */
1855     if (errno != EEXIST) {
1856       jlog_ctx_close(new_ctx);
1857       return -1;
1858     }
1859   }
1860
1861   /* Open a reader for the new subscriber */
1862   if(jlog_ctx_open_reader(new_ctx, new) < 0) {
1863     jlog_ctx_close(new_ctx);
1864     return -1;
1865   }
1866
1867   /* Set the checkpoint of the new subscriber to
1868    * the old subscriber's checkpoint */
1869   if (jlog_ctx_read_checkpoint(new_ctx, &chkpt)) {
1870     return -1;
1871   }
1872
1873   jlog_ctx_close(new_ctx);
1874   return 0;
1875 }
1876
1877 int jlog_ctx_set_subscriber_checkpoint(jlog_ctx *ctx, const char *s,
1878                                 const jlog_id *checkpoint)
1879 {
1880
1881   if (jlog_ctx_add_subscriber(ctx, s, JLOG_BEGIN)) {
1882     if (errno != EEXIST) {
1883       return -1;
1884     }
1885   }
1886
1887   return __jlog_set_checkpoint(ctx, s, checkpoint);
1888 }
1889
1890
1891 int jlog_ctx_write(jlog_ctx *ctx, const void *data, size_t len) {
1892   jlog_message m;
1893   m.mess = (void *)data;
1894   m.mess_len = len;
1895   return jlog_ctx_write_message(ctx, &m, NULL);
1896 }
1897
1898 static int __jlog_find_first_log_after(jlog_ctx *ctx, jlog_id *chkpt,
1899                                 jlog_id *start, jlog_id *finish) {
1900   jlog_id last;
1901   int closed;
1902
1903   memcpy(start, chkpt, sizeof(*chkpt));
1904  attempt:
1905   if(__jlog_resync_index(ctx, start->log, &last, &closed) != 0) {
1906     if(ctx->last_error == JLOG_ERR_FILE_OPEN &&
1907         ctx->last_errno == ENOENT) {
1908       char file[MAXPATHLEN];
1909       int ferr, len;
1910       struct stat sb = {0};
1911
1912       memset(file, 0, sizeof(file));
1913       STRSETDATAFILE(ctx, file, start->log + 1);
1914       while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1915       /* That file doesn't exist... bad, but we can fake a recovery by
1916          advancing the next file that does exist */
1917       ctx->last_error = JLOG_ERR_SUCCESS;
1918       if(start->log >= ctx->meta->storage_log || (ferr != 0 && errno != ENOENT)) {
1919         /* We don't advance past where people are writing */
1920         memcpy(finish, start, sizeof(*start));
1921         return 0;
1922       }
1923       if(__jlog_resync_index(ctx, start->log + 1, &last, &closed) != 0) {
1924         /* We don't advance past where people are writing */
1925         memcpy(finish, start, sizeof(*start));
1926         return 0;
1927       }
1928       len = strlen(file);
1929       if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
1930       memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
1931       while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1932       if(ferr != 0 || sb.st_size == 0) {
1933         /* We don't advance past where people are writing */
1934         memcpy(finish, start, sizeof(*start));
1935         return 0;
1936       }
1937       start->marker = 0;
1938       start->log++;  /* BE SMARTER! */
1939       goto attempt;
1940     }
1941     return -1; /* Just persist resync's error state */
1942   }
1943
1944   /* If someone checkpoints off the end, be nice */
1945   if(last.log == start->log && last.marker < start->marker)
1946     memcpy(start, &last, sizeof(*start));
1947
1948   if(!memcmp(start, &last, sizeof(last)) && closed) {
1949     char file[MAXPATHLEN];
1950     int ferr, len;
1951     struct stat sb = {0};
1952
1953     memset(file, 0, sizeof(file));
1954     STRSETDATAFILE(ctx, file, start->log + 1);
1955     while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1956     if(ferr) {
1957       fprintf(stderr, "stat(%s) error: %s\n", file, strerror(errno));
1958       if(start->log < ctx->meta->storage_log - 1) {
1959         start->marker = 0;
1960         start->log += 2;
1961         memcpy(finish, start, sizeof(*start));
1962         return 0;
1963       }
1964     }
1965     if(start->log >= ctx->meta->storage_log || ferr != 0 || sb.st_size == 0) {
1966       /* We don't advance past where people are writing */
1967       memcpy(finish, start, sizeof(*start));
1968       return 0;
1969     }
1970     if(__jlog_resync_index(ctx, start->log + 1, &last, &closed) != 0) {
1971       /* We don't advance past where people are writing */
1972       memcpy(finish, start, sizeof(*start));
1973       return 0;
1974     }
1975     len = strlen(file);
1976     if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
1977     memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
1978     while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1979     if(ferr != 0 || sb.st_size == 0) {
1980       /* We don't advance past where people are writing */
1981       memcpy(finish, start, sizeof(*start));
1982       return 0;
1983     }
1984     start->marker = 0;
1985     start->log++;
1986     goto attempt;
1987   }
1988   memcpy(finish, &last, sizeof(last));
1989   return 0;
1990 }
1991 int jlog_ctx_read_message(jlog_ctx *ctx, const jlog_id *id, jlog_message *m) {
1992   off_t index_len;
1993   u_int64_t data_off;
1994   int with_lock = 0;
1995   size_t hdr_size = 0;
1996   uint32_t *message_disk_len = &m->aligned_header.mlen;
1997
1998   if (IS_COMPRESS_MAGIC(ctx)) {
1999     hdr_size = sizeof(jlog_message_header_compressed);
2000     message_disk_len = &m->aligned_header.compressed_len;
2001   } else {
2002     hdr_size = sizeof(jlog_message_header);
2003   }
2004
2005  once_more_with_lock:
2006
2007   ctx->last_error = JLOG_ERR_SUCCESS;
2008   if (ctx->context_mode != JLOG_READ)
2009     SYS_FAIL(JLOG_ERR_ILLEGAL_WRITE);
2010   if (id->marker < 1) {
2011     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
2012   }
2013
2014   __jlog_open_reader(ctx, id->log);
2015   if(!ctx->data)
2016     SYS_FAIL(JLOG_ERR_FILE_OPEN);
2017   __jlog_open_indexer(ctx, id->log);
2018   if(!ctx->index)
2019     SYS_FAIL(JLOG_ERR_IDX_OPEN);
2020
2021   if(with_lock) {
2022     if (!jlog_file_lock(ctx->index)) {
2023       with_lock = 0;
2024       SYS_FAIL(JLOG_ERR_LOCK);
2025     }
2026   }
2027
2028   if ((index_len = jlog_file_size(ctx->index)) == -1)
2029     SYS_FAIL(JLOG_ERR_IDX_SEEK);
2030   if (index_len % sizeof(u_int64_t))
2031     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
2032   if (id->marker * sizeof(u_int64_t) > index_len) {
2033     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
2034   }
2035
2036   if (!jlog_file_pread(ctx->index, &data_off, sizeof(u_int64_t),
2037                        (id->marker - 1) * sizeof(u_int64_t)))
2038   {
2039     SYS_FAIL(JLOG_ERR_IDX_READ);
2040   }
2041   if (data_off == 0 && id->marker != 1) {
2042     if (id->marker * sizeof(u_int64_t) == index_len) {
2043       /* close tag; not a real offset */
2044       ctx->last_error = JLOG_ERR_CLOSE_LOGID;
2045       ctx->last_errno = 0;
2046       if(with_lock) jlog_file_unlock(ctx->index);
2047       return -1;
2048     } else {
2049       /* an offset of 0 in the middle of an index means curruption */
2050       SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
2051     }
2052   }
2053
2054   if(__jlog_mmap_reader(ctx, id->log) != 0)
2055     SYS_FAIL(JLOG_ERR_FILE_READ);
2056
2057   if(data_off > ctx->mmap_len - hdr_size) {
2058 #ifdef DEBUG
2059     fprintf(stderr, "read idx off end: %llu\n", data_off);
2060 #endif
2061     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
2062   }
2063
2064   memcpy(&m->aligned_header, ((u_int8_t *)ctx->mmap_base) + data_off,
2065          hdr_size);
2066
2067   if(data_off + hdr_size + *message_disk_len > ctx->mmap_len) {
2068 #ifdef DEBUG
2069     fprintf(stderr, "read idx off end: %llu %llu\n", data_off, ctx->mmap_len);
2070 #endif
2071     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
2072   }
2073
2074   m->header = &m->aligned_header;
2075
2076   if (IS_COMPRESS_MAGIC(ctx)) {
2077     if (ctx->mess_data_size < m->aligned_header.mlen) {
2078       ctx->mess_data = realloc(ctx->mess_data, m->aligned_header.mlen * 2);
2079       ctx->mess_data_size = m->aligned_header.mlen * 2;
2080     }
2081     jlog_decompress((((char *)ctx->mmap_base) + data_off + hdr_size),
2082                     m->header->compressed_len, ctx->mess_data, ctx->mess_data_size);
2083     m->mess_len = m->header->mlen;
2084     m->mess = ctx->mess_data;
2085   } else {
2086     m->mess_len = m->header->mlen;
2087     m->mess = (((u_int8_t *)ctx->mmap_base) + data_off + hdr_size);
2088   }
2089
2090  finish:
2091   if(with_lock) jlog_file_unlock(ctx->index);
2092   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
2093   if(!with_lock) {
2094     if (ctx->last_error == JLOG_ERR_IDX_CORRUPT) {
2095       if (jlog_file_lock(ctx->index)) {
2096         jlog_file_truncate(ctx->index, 0);
2097         jlog_file_unlock(ctx->index);
2098       }
2099     }
2100     ___jlog_resync_index(ctx, id->log, NULL, NULL);
2101     with_lock = 1;
2102 #ifdef DEBUG
2103     fprintf(stderr, "read retrying with lock\n");
2104 #endif
2105     goto once_more_with_lock;
2106   }
2107   return -1;
2108 }
2109 int jlog_ctx_read_interval(jlog_ctx *ctx, jlog_id *start, jlog_id *finish) {
2110   jlog_id chkpt;
2111   int count = 0;
2112
2113   ctx->last_error = JLOG_ERR_SUCCESS;
2114   if(ctx->context_mode != JLOG_READ) {
2115     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
2116     ctx->last_errno = EPERM;
2117     return -1;
2118   }
2119
2120   __jlog_restore_metastore(ctx, 0);
2121   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &chkpt))
2122     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
2123   if(__jlog_find_first_log_after(ctx, &chkpt, start, finish) != 0)
2124     goto finish; /* Leave whatever error was set in find_first_log_after */
2125   if(start->log != chkpt.log) start->marker = 0;
2126   else start->marker = chkpt.marker;
2127   if(start->log != chkpt.log) {
2128     /* We've advanced our checkpoint, let's not do this work again */
2129     if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, start) != 0)
2130       SYS_FAIL(JLOG_ERR_CHECKPOINT);
2131   }
2132   /* Here 'start' is actually the checkpoint, so we must advance it one.
2133      However, that may not be possible, if there are no messages, so first
2134      make sure finish is bigger */
2135   count = finish->marker - start->marker;
2136   if(finish->marker > start->marker) start->marker++;
2137
2138   /* If the count is less than zero, the checkpoint is off the end
2139    * of the file. When this happens, we need to set it to the end of
2140    * the file */
2141   if (count < 0) {
2142     fprintf(stderr, "need to repair checkpoint for %s - start (%08x:%08x) > finish (%08x:%08x)\n", ctx->path,
2143         start->log, start->marker, finish->log, finish->marker);
2144     if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, finish) != 0) {
2145       fprintf(stderr, "failed repairing checkpoint for %s\n", ctx->path);
2146       SYS_FAIL(JLOG_ERR_CHECKPOINT);
2147     }
2148     if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &chkpt)) {
2149       /* Should never happen */
2150       SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
2151     }
2152     fprintf(stderr, "repaired checkpoint for %s: %08x:%08x\n", ctx->path, chkpt.log, chkpt.marker);
2153     ctx->last_error = JLOG_ERR_SUCCESS;
2154     count = 0;
2155   }
2156
2157   /* We need to munmap it, so that we can remap it with more data if needed */
2158   __jlog_munmap_reader(ctx);
2159  finish:
2160   if(ctx->last_error == JLOG_ERR_SUCCESS) return count;
2161   return -1;
2162 }
2163
2164 int jlog_ctx_first_log_id(jlog_ctx *ctx, jlog_id *id) {
2165   DIR *d;
2166   struct dirent *de;
2167   ctx->last_error = JLOG_ERR_SUCCESS;
2168   u_int32_t log;
2169   int found = 0;
2170
2171   id->log = 0xffffffff;
2172   id->marker = 0;
2173   d = opendir(ctx->path);
2174   if (!d) return -1;
2175
2176   while ((de = readdir(d))) {
2177     int i;
2178     char *cp = de->d_name;
2179     if(strlen(cp) != 8) continue;
2180     log = 0;
2181     for(i=0;i<8;i++) {
2182       log <<= 4;
2183       if(cp[i] >= '0' && cp[i] <= '9') log |= (cp[i] - '0');
2184       else if(cp[i] >= 'a' && cp[i] <= 'f') log |= (cp[i] - 'a' + 0xa);
2185       else if(cp[i] >= 'A' && cp[i] <= 'F') log |= (cp[i] - 'A' + 0xa);
2186       else break;
2187     }
2188     if(i != 8) continue;
2189     found = 1;
2190     if(log < id->log) id->log = log;
2191   }
2192   if(!found) id->log = 0;
2193   closedir(d);
2194   return 0;
2195 }
2196
2197 int jlog_ctx_last_log_id(jlog_ctx *ctx, jlog_id *id) {
2198   ctx->last_error = JLOG_ERR_SUCCESS;
2199   if(ctx->context_mode != JLOG_READ) {
2200     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
2201     ctx->last_errno = EPERM;
2202     return -1;
2203   }
2204   if (__jlog_restore_metastore(ctx, 0) != 0) return -1;
2205   ___jlog_resync_index(ctx, ctx->meta->storage_log, id, NULL);
2206   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
2207   return -1;
2208 }
2209
2210 int jlog_ctx_advance_id(jlog_ctx *ctx, jlog_id *cur,
2211                         jlog_id *start, jlog_id *finish)
2212 {
2213   int rv;
2214   if(memcmp(cur, finish, sizeof(jlog_id))) {
2215     start->marker++;
2216   } else {
2217     if((rv = __jlog_find_first_log_after(ctx, cur, start, finish)) != 0) {
2218       return rv;
2219     }
2220     if(cur->log != start->log) {
2221       start->marker = 1;
2222     }
2223     else start->marker = cur->marker;
2224   }
2225   return 0;
2226 }
2227
2228 static int is_datafile(const char *f, u_int32_t *logid) {
2229   int i;
2230   u_int32_t l = 0;
2231   for(i=0; i<8; i++) {
2232     if((f[i] >= '0' && f[i] <= '9') ||
2233        (f[i] >= 'a' && f[i] <= 'f')) {
2234       l <<= 4;
2235       l |= (f[i] < 'a') ? (f[i] - '0') : (f[i] - 'a' + 10);
2236     }
2237     else
2238       return 0;
2239   }
2240   if(f[i] != '\0') return 0;
2241   if(logid) *logid = l;
2242   return 1;
2243 }
2244
2245 int jlog_clean(const char *file) {
2246   int rv = -1;
2247   u_int32_t earliest = 0;
2248   jlog_ctx *log;
2249   DIR *dir;
2250   struct dirent *de;
2251
2252   log = jlog_new(file);
2253   jlog_ctx_open_writer(log);
2254   dir = opendir(file);
2255   if(!dir) goto out;
2256
2257   earliest = 0;
2258   if(jlog_pending_readers(log, 0, &earliest) < 0) goto out;
2259
2260   rv = 0;
2261   while((de = readdir(dir)) != NULL) {
2262     u_int32_t logid;
2263     if(is_datafile(de->d_name, &logid) && logid < earliest) {
2264       char fullfile[MAXPATHLEN];
2265       char fullidx[MAXPATHLEN];
2266
2267       memset(fullfile, 0, sizeof(fullfile));
2268       memset(fullidx, 0, sizeof(fullidx));
2269       snprintf(fullfile, sizeof(fullfile), "%s/%s", file, de->d_name);
2270       snprintf(fullidx, sizeof(fullidx), "%s/%s" INDEX_EXT, file, de->d_name);
2271       (void)unlink(fullfile);
2272       (void)unlink(fullidx); /* this may not exist; don't care */
2273       rv++;
2274     }
2275   }
2276   closedir(dir);
2277  out:
2278   jlog_ctx_close(log);
2279   return rv;
2280 }
2281
2282 /* ------------------ jlog_ctx_repair() and friends ----------- */
2283
2284 /*
2285   This code attempts to repair problems with the metastore file and
2286   also a checkpoint file, within a jlog directory. The top level
2287   function takes an integer parameter and returns an integer result.
2288   If the argument is zero, then non-aggressive repairs
2289   are attempted. If the argument is non-zero, and if the
2290   non-aggressive repairs were not successful, then an aggressive
2291   repair approach is attempted. This consists of; (a) deleting
2292   all files in the log directory; (b) deleting the log directory
2293   itself.
2294
2295   The reader will note that some of this functionality is addressed
2296   by other code within this file. An early decision was made not
2297   to reuse any of this code, but rather to attempt a solution from
2298   first principles. This is not due to a bad case of NIH, instead
2299   it is due to a desire to implement all and only the behaviors
2300   stated, without any (apparent) possibility of side effects.
2301
2302   The reader will also notice that this code uses memory allocation
2303   for filenames and directory paths, rather than static variables of
2304   size MAXPATHLEN. This is also intentional. Having large local
2305   variables (like 4k in this case) can lead to unfortunate behavior
2306   on some systems. The compiler should do the right thing, but that
2307   does not mean that it will do the right thing.
2308 */
2309
2310 // find the earliest and latest hex files in the directory
2311
2312 static int findel(DIR *dir, unsigned int *earp, unsigned int *latp) {
2313   unsigned int maxx = 0;
2314   unsigned int minn = 0;
2315   unsigned int hexx = 0;
2316   struct dirent *ent;
2317   int havemaxx = 0;
2318   int haveminn = 0;
2319   int nent = 0;
2320
2321   if ( dir == NULL )
2322     return 0;
2323   (void)rewinddir(dir);
2324   while ( (ent = readdir(dir)) != NULL ) {
2325     if ( ent->d_name[0] != '\0' ) {
2326       nent++;
2327       if ( strlen(ent->d_name) == 8 &&
2328            sscanf(ent->d_name, "%x", &hexx) == 1 ) {
2329         if ( havemaxx == 0 ) {
2330           havemaxx = 1;
2331           maxx = hexx;
2332         } else {
2333           if ( hexx > maxx )
2334             maxx = hexx;
2335         }
2336         if ( haveminn == 0 ) {
2337           haveminn = 1;
2338           minn = hexx;
2339         } else {
2340           if ( hexx < minn )
2341             minn = hexx;
2342         }
2343       }
2344     }
2345   }
2346   if ( (havemaxx == 1) && (latp != NULL) )
2347     *latp = maxx;
2348   if ( (haveminn == 1) && (earp != NULL) )
2349     *earp = minn;
2350   // a valid directory has at least . and .. entries
2351   return (nent >= 2);
2352 }
2353
2354 /*
2355   The metastore repair command is:
2356    perl -e 'print pack("IIII", 0xLATEST_FILE_HERE, 4*1024*1024, 1, 0x663A7318);
2357       > metastore
2358    The final hex number is known as DEFAULT_HDR_MAGIC
2359 */
2360
2361 static int metastore_ok_p(char *ag, unsigned int lat) {
2362   int fd = open(ag, O_RDONLY);
2363   FASSERT(fd >= 0, "cannot open metastore file");
2364   if ( fd < 0 )
2365     return 0;
2366   // now we use a very slightly tricky way to get the filesize on
2367   // systems that don't necessarily have <sys/stat.h>
2368   off_t oof = lseek(fd, 0, SEEK_END);
2369   (void)lseek(fd, 0, SEEK_SET);
2370   size_t fourI = 4*sizeof(unsigned int);
2371   FASSERT(oof == (off_t)fourI, "metastore size invalid");
2372   if ( oof != (off_t)fourI ) {
2373     (void)close(fd);
2374     return 0;
2375   }
2376   unsigned int goal[4];
2377   goal[0] = lat;
2378   goal[1] = 4*1024*1024;
2379   goal[2] = 1;
2380   goal[3] = DEFAULT_HDR_MAGIC;
2381   unsigned int have[4];
2382   int rd = read(fd, &have[0], fourI);
2383   (void)close(fd);
2384   fd = -1;
2385   FASSERT(rd == fourI, "read error on metastore file");
2386   if ( rd != fourI )
2387     return 0;
2388   int gotem = 0;
2389   int i;
2390   for(i=0;i<4;i++) {
2391     if ( goal[i] == have[i] )
2392       gotem++;
2393   }
2394   FASSERT(gotem == 4, "metastore contents incorrect");
2395   return (gotem == 4);
2396 }
2397
2398 static int repair_metastore(const char *pth, unsigned int lat) {
2399   if ( pth == NULL || pth[0] == '\0' ) {
2400     FASSERT(0, "invalid metastore path");
2401     return 0;
2402   }
2403   size_t leen = strlen(pth);
2404   if ( (leen == 0) || (leen > (MAXPATHLEN-12)) ) {
2405     FASSERT(0, "invalid metastore path length");
2406     return 0;
2407   }
2408   size_t leen2 = leen + strlen("metastore") + 4;
2409   char *ag = (char *)calloc(leen2, sizeof(char));
2410   if ( ag == NULL )             /* out of memory, so bail */
2411     return 0;
2412   (void)snprintf(ag, leen2-1, "%s%cmetastore", pth, IFS_CH);
2413   int b = metastore_ok_p(ag, lat);
2414   FASSERT(b, "metastore integrity check failed");
2415   unsigned int goal[4];
2416   goal[0] = lat;
2417   goal[1] = 4*1024*1024;
2418   goal[2] = 1;
2419   goal[3] = DEFAULT_HDR_MAGIC;
2420   (void)unlink(ag);             /* start from scratch */
2421   int fd = creat(ag, DEFAULT_FILE_MODE);
2422   free((void *)ag);
2423   ag = NULL;
2424   FASSERT(fd >= 0, "cannot create new metastore file");
2425   if ( fd < 0 )
2426     return 0;
2427   int wr = write(fd, &goal[0], sizeof(goal));
2428   (void)close(fd);
2429   FASSERT(wr == sizeof(goal), "cannot write new metastore file");
2430   return (wr == sizeof(goal));
2431 }
2432
2433 static int new_checkpoint(char *ag, int fd, unsigned int ear) {
2434   int newfd = 0;
2435   int sta = 0;
2436   if ( ag == NULL || ag[0] == '\0' ) {
2437     FASSERT(0, "invalid checkpoint path");
2438     return 0;
2439   }
2440   if ( fd < 0 ) {
2441     (void)unlink(ag);
2442     fd = creat(ag, DEFAULT_FILE_MODE);
2443     FASSERT(fd >= 0, "cannot create checkpoint file");
2444     if ( fd < 0 )
2445       return 0;
2446     else
2447       newfd = 1;
2448   }
2449   int x = ftruncate(fd, 0);
2450   FASSERT(x >= 0, "ftruncate failed to zero out checkpoint file");
2451   if ( x >= 0 ) {
2452     off_t xcvR = lseek(fd, 0, SEEK_SET);
2453     FASSERT(xcvR == 0, "cannot seek to beginning of checkpoint file");
2454     if ( xcvR == 0 ) {
2455       unsigned int goal[2];
2456       goal[0] = ear;
2457       goal[1] = 0;
2458       int wr = write(fd, goal, sizeof(goal));
2459       FASSERT(wr == sizeof(goal), "cannot write checkpoint file");
2460       sta = (wr == sizeof(goal));
2461     }
2462   }
2463   if ( newfd == 1 )
2464     (void)close(fd);
2465   return sta;
2466 }
2467
2468 static const int five = 5;
2469
2470 static int repair_checkpointfile(DIR *dir, const char *pth, unsigned int ear) {
2471   FASSERT(dir != NULL, "invalid directory");
2472   if ( dir == NULL )
2473     return 0;
2474   struct dirent *ent = NULL;
2475   char *ag = NULL;
2476   int   fd = -1;
2477
2478   (void)rewinddir(dir);
2479   size_t twoI = 2*sizeof(unsigned int);
2480   int sta = 0;
2481   while ( (ent = readdir(dir)) != NULL ) {
2482     if ( ent->d_name[0] != '\0' ) {
2483       if ( strncmp(ent->d_name, "cp.", 3) == 0 ) {
2484         char n[3];
2485         n[0] = ent->d_name[3];
2486         if ( n[0] != 0 )
2487           n[1] = ent->d_name[4];
2488         else
2489           n[1] = 0;
2490         n[2] = 0;
2491         int tilde = (int)'~';
2492         int mtilde = 0;
2493         if ( (sscanf(n, "%d", &mtilde) != 1) || (mtilde != tilde ) ) {
2494           sta = 1;
2495           break;
2496         }
2497       }
2498     }
2499   }
2500   FASSERT(sta, "could not find a checkpoint file");
2501   // we cannot simply create a checkpoint file if we don't have the
2502   // filename, so there is nothing to do here
2503   if ( sta == 0 )
2504     return 1;
2505   size_t leen = strlen(pth) + strlen(ent->d_name) + five;
2506   FASSERT(leen < MAXPATHLEN, "invalid checkpoint path length");
2507   if ( leen >= MAXPATHLEN )
2508     return 0;
2509   ag = (char *)calloc(leen+1, sizeof(char));
2510   if ( ag == NULL )     /* out of memory, so bail */
2511     return 0;
2512   (void)snprintf(ag, leen-1, "%s%c%s", pth, IFS_CH, ent->d_name);
2513   unsigned int goal[2];
2514   goal[0] = ear;
2515   goal[1] = 0;
2516   fd = open(ag, O_RDWR);
2517   sta = 0;
2518   FASSERT(fd >= 0, "cannot open checkpoint file");
2519   if ( fd >= 0 ) {
2520     off_t oof = lseek(fd, 0, SEEK_END);
2521     (void)lseek(fd, 0, SEEK_SET);
2522     FASSERT(oof != (off_t)twoI, "checkpoint file size incorrect");
2523     if ( oof == (off_t)twoI ) {
2524       unsigned int have[2];
2525       int rd = read(fd, have, sizeof(have));
2526       FASSERT(rd == sizeof(have), "cannot read checkpoint file");
2527       if ( rd == sizeof(have) ) {
2528         if ( (goal[0] != have[0]) || (goal[1] != have[1]) ) {
2529           FASSERT(0, "invalid checkpoint data");
2530         } else
2531           sta = 1;
2532       }
2533     }
2534   }
2535   if ( sta == 0 ) {
2536     sta = new_checkpoint(ag, fd, ear);
2537     FASSERT(sta, "cannot create new checkpoint file");
2538   }
2539   if ( fd >= 0 )
2540     (void)close(fd);
2541   if ( ag != NULL )
2542     (void)free((void *)ag);
2543   return sta;
2544 }
2545
2546 #if 0
2547
2548 // we want a directory of the form DIRsepDIRsep, with a separator
2549 // already at the end
2550
2551 static const char *findparentdirectory(const char *pth, int *off2fnp) {
2552   size_t strt = 0;
2553   size_t leen = strlen(pth);
2554   // special case: the top level directory
2555   if ( leen == 1 && pth[0] == IFS_CH ) {
2556     *off2fnp = 1;
2557     return strdup(pth);
2558   }
2559   // is the last char already a sep?
2560   if ( pth[leen-1] == IFS_CH )
2561     strt = leen - 2;
2562   else
2563     strt = leen - 1;
2564   char *sep = strrchr(&pth[strt], IFS_CH);
2565   *off2fnp = (int)(sep - &pth[0]);
2566   char *ag = strdup(pth);
2567   if ( ag == NULL )
2568     return NULL;
2569   ag[*off2fnp] = '\0';
2570   return ag;
2571 }
2572
2573 static char *makeparentname(const char *pth, size_t fnlen, int *off2fnp) {
2574   const char *pdir = findparentdirectory(pth, off2fnp);
2575   if ( pdir == NULL || pdir[0] == '\0' )
2576     return NULL;
2577   char *ag = (char *)calloc(fnlen, sizeof(char));
2578   if ( ag == NULL )
2579     return(NULL);
2580   (void)memcpy((void *)ag, pdir, strlen(pdir));
2581   free((void *)pdir);
2582   return ag;
2583 }
2584
2585 #endif
2586
2587 typedef struct _strlist {
2588   char *entry;
2589   struct _strlist *next;
2590 } strlist;
2591
2592 static strlist *strhead = NULL;
2593
2594 /*
2595   When doing a directory traveral using readdir(), it is not safe to
2596   perform a rename() or unlink() during the traversal. So we have to
2597   save these filenames for processing after the traversal is done.
2598 */
2599
2600 static void schedule_one_file(char *fn) {
2601   if ( fn == NULL || fn[0] == '\0' )
2602     return;
2603   strlist *snew = (strlist *)calloc(1, sizeof(strlist));
2604   if ( snew == NULL )           /* no memory, bail */
2605     return;
2606   snew->entry = strdup(fn);
2607   if ( snew->entry == NULL )
2608     return;                     /* dangerous to free memory, if out of mem */
2609   snew->next = strhead;
2610   strhead = snew;
2611 }
2612
2613 static void destroy_all_schedule_memory(void)
2614 {
2615   strlist *runn = strhead;
2616   while ( runn != NULL ) {
2617     strlist *nxt = runn->next;
2618     if ( runn->entry != NULL ) {
2619       free((void *)(runn->entry));
2620       runn->entry = NULL;
2621     }
2622     free((void *)runn);
2623     runn = nxt;
2624   }
2625   strhead = NULL;
2626 }
2627
2628 #if 0
2629
2630 static void move_one_file(const char *pth, char *parent, int off2fn,
2631                           char *nam) {
2632   size_t leen = strlen(pth) + strlen(nam) + five;
2633   if ( leen >= MAXPATHLEN )
2634     return;
2635   char *ag = (char *)calloc(leen, sizeof(char));
2636   if ( ag == NULL )
2637     return;
2638   (void)snprintf(ag, leen-1, "%s%c%s", pth, IFS_CH, nam);
2639   (void)memcpy(&parent[off2fn], nam, 1 + strlen(nam)); /* copy the NUL */
2640   (void)rename(ag, parent);
2641   free((void *)ag);
2642 }
2643
2644 static void move_the_files(const char *pth, char *parent, int off2fn) {
2645 #ifdef DUSTY_SPRINGFIELD
2646   (void)printf("I'd like the Dusty Springfield special, with extra dust\n");
2647 #endif
2648   strlist *runn = strhead;
2649   while ( runn != NULL ) {
2650     if ( runn->entry != NULL && runn->entry[0] != '\0' )
2651       move_one_file(pth, parent, off2fn, runn->entry);
2652     runn = runn->next;
2653   }
2654   destroy_all_schedule_memory();
2655 }
2656
2657 #endif
2658
2659 static void delete_one_file(const char *pth, char *nam) {
2660   size_t leen = strlen(pth) + strlen(nam) + five;
2661   if ( leen >= MAXPATHLEN )
2662     return;
2663   char *ag = (char *)calloc(leen, sizeof(char));
2664   if ( ag == NULL )
2665     return;
2666   (void)snprintf(ag, leen-1, "%s%c%s", pth, IFS_CH, nam);
2667   (void)unlink(ag);
2668   free((void *)ag);
2669 }
2670
2671 static void delete_the_files(const char *pth) {
2672   strlist *runn = strhead;
2673   while ( runn != NULL ) {
2674     if ( runn->entry != NULL && runn->entry[0] != '\0' )
2675       delete_one_file(pth, runn->entry);
2676     runn = runn->next;
2677   }
2678   destroy_all_schedule_memory();
2679 }
2680
2681 #if 0
2682
2683 /*
2684   if there are fassert files in the jlog directory, try to move them
2685   to the parent directory. It is ok, for the moment, if this fails.
2686   Also, not to be circular, never call FASSERT() in this function,
2687   or any of its callees. [unused]
2688 */
2689
2690 static void try_to_save_fasserts(const char *pth, DIR *dir) {
2691   if ( pth == NULL || pth[0] == '\0' || dir == NULL )
2692   return;
2693   size_t leen = strlen(pth);
2694   // an fassert file has the form fassertT, where T is the time as a long
2695   size_t flen = strlen("fassert");
2696   size_t fnlen = flen + 10 + 3;
2697   if ( (leen + fnlen) >= MAXPATHLEN )
2698     return;
2699   int off2fn = 0;
2700   char *parent = makeparentname(pth, fnlen, &off2fn);
2701   if ( parent == NULL )
2702     return;
2703   struct dirent *ent;
2704   (void)rewinddir(dir);
2705   int ntomove = 0;
2706   while ( (ent = readdir(dir)) != NULL ) {
2707     if ( ent->d_name[0] != '\0' ) {
2708       if ( strncmp("fassert", ent->d_name, flen) == 0 ) {
2709         // if we attempt to do a rename() during a directory traversal
2710         // using readdir(), the results will be undesirable
2711         schedule_one_file(ent->d_name);
2712         ntomove++;
2713       }
2714     }
2715   }
2716   if ( ntomove > 0 )
2717     move_the_files(pth, parent, off2fn);
2718   free((void *)parent);
2719 }
2720
2721 #endif
2722
2723 /*
2724   Try as hard as we can to remove all files. Ignore failures of intermediate
2725   steps, because the user can always manually remove the directory and its
2726   contents if all else fails.
2727
2728   We cannot use FASSERT in this function because it might create a new file
2729   in the directory we are trying to remove.
2730 */
2731
2732 static int rmcontents_and_dir(const char *pth, DIR *dir) {
2733   int sta = 0;
2734   if ( pth == NULL || pth[0] == '\0' )
2735     return 0;
2736   int ntodelete = 0;
2737   if ( dir != NULL ) {
2738     struct dirent *ent = NULL;
2739     (void)rewinddir(dir);
2740     while ( (ent = readdir(dir)) != NULL ) {
2741       if ( ent->d_name[0] != '\0' ) {
2742         if ( (strcmp(ent->d_name, ".") != 0) &&
2743              (strcmp(ent->d_name, "..") != 0) ) {
2744           schedule_one_file(ent->d_name);
2745           ntodelete++;
2746         }
2747       }
2748     }
2749     (void)closedir(dir);
2750   }
2751   if ( ntodelete > 0 )
2752     delete_the_files(pth);
2753   sta = rmdir(pth);
2754   return (sta >= 0);
2755 }
2756
2757 /* exported */
2758 int jlog_ctx_repair(jlog_ctx *ctx, int aggressive) {
2759   // step 1: extract the directory path
2760   const char *pth;
2761   DIR *dir = NULL;
2762
2763   if ( ctx != NULL )
2764     pth = ctx->path;
2765   else
2766     pth = NULL; // fassertxgetpath();
2767   if ( pth == NULL || pth[0] == '\0' ) {
2768     FASSERT(0, "repair command cannot find jlog path");
2769     ctx->last_error = JLOG_ERR_NOTDIR;
2770     return 0;               /* hopeless without a dir name */
2771   }
2772   // step 2: find the earliest and the latest files with hex names
2773   dir = opendir(pth);
2774   FASSERT(dir != NULL, "cannot open jlog directory");
2775   if ( dir == NULL ) {
2776     int bx = 0;
2777     if ( aggressive == 1 )
2778       bx = rmcontents_and_dir(pth, NULL);
2779     if ( bx == 0 )
2780       ctx->last_error = JLOG_ERR_NOTDIR;
2781     else
2782       ctx->last_error = JLOG_ERR_SUCCESS;
2783     return bx;
2784   }
2785   unsigned int ear = 0;
2786   unsigned int lat = 0;
2787   int b0 = findel(dir, &ear, &lat);
2788   FASSERT(b0, "cannot find hex files in jlog directory");
2789   if ( b0 == 1 ) {
2790     // step 3: attempt to repair the metastore. It might not need any
2791     // repair, in which case nothing will happen
2792     int b1 = repair_metastore(pth, lat);
2793     FASSERT(b1, "cannot repair metastore");
2794     // step 4: attempt to repair the checkpoint file. It might not need
2795     // any repair, in which case nothing will happen
2796     int b2 = repair_checkpointfile(dir, pth, ear);
2797     FASSERT(b2, "cannot repair checkpoint file");
2798     // if non-aggressive repair succeeded, then declare success
2799     if ( (b1 == 1) && (b2 == 1) ) {
2800       (void)closedir(dir);
2801       ctx->last_error = JLOG_ERR_SUCCESS;
2802       return 1;
2803     }
2804   }
2805   // if aggressive repair is not authorized, fail
2806   FASSERT(aggressive, "non-aggressive repair failed");
2807   if ( aggressive == 0 ) {
2808     (void)closedir(dir);
2809     ctx->last_error = JLOG_ERR_CREATE_META;
2810     return 0;
2811   }
2812   // step 5: if there are any fassert files, try to save them by
2813   // moving them to the parent directory of "pth". Also make sure
2814   // to close the current fassert file. [unused]
2815   // fassertxend();
2816   // try_to_save_fasserts(pth, dir);
2817   // step 6: destroy the directory with extreme prejudice
2818   int b3 = rmcontents_and_dir(pth, dir);
2819   FASSERT(b3, "Aggressive repair of jlog directory failed");
2820   //  (void)closedir(dir);
2821   if ( b3 == 0 )
2822     ctx->last_error = JLOG_ERR_NOTDIR;
2823   else
2824     ctx->last_error = JLOG_ERR_SUCCESS;
2825   return b3;
2826 }
2827
2828 /* -------------end of jlog_ctx_repair() and friends ----------- */
2829
2830 /* vim:se ts=2 sw=2 et: */
Note: See TracBrowser for help on using the browser.