root/jlog.c

Revision adf2ee8486352700056029fde45a13fc81b71218, 38.8 kB (checked in by Wez Furlong <wez.furlong@messagesystems.com>, 6 years ago)

Sync with the ecelerity jlog sources

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2008, Message Systems, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name Message Systems, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 /*****************************************************************
34
35   Journaled logging... append only.
36
37       (1) find current file, or allocate a file, extendible and mark
38           it current.
39  
40       (2) Write records to it, records include their size, so
41           a simple inspection can detect and incomplete trailing
42           record.
43    
44       (3) Write append until the file reaches a certain size.
45
46       (4) Allocate a file, extensible.
47
48       (5) RESYNC INDEX on 'finished' file (see reading:3) and postpend
49           an offset '0' to the index.
50    
51       (2) goto (1)
52    
53   Reading journals...
54
55       (1) find oldest checkpoint of all subscribers, remove all older files.
56
57       (2) (file, last_read) = find_checkpoint for this subscriber
58
59       (3) RESYNC INDEX:
60           open record index for file, seek to the end -  off_t.
61           this is the offset of the last noticed record in this file.
62           open file, seek to this point, roll forward writing the index file
63           _do not_ write an offset for the last record unless it is found
64           complete.
65
66       (4) read entries from last_read+1 -> index of record index
67
68 */
69 #include <stdio.h>
70
71 #include "jlog_config.h"
72 #include "jlog_private.h"
73 #if HAVE_UNISTD_H
74 #include <unistd.h>
75 #endif
76 #if HAVE_SYS_TIME_H
77 #include <sys/time.h>
78 #endif
79 #if HAVE_DIRENT_H
80 #include <dirent.h>
81 #endif
82 #if HAVE_FCNTL_H
83 #include <fcntl.h>
84 #endif
85 #if HAVE_ERRNO_H
86 #include <errno.h>
87 #endif
88 #if HAVE_TIME_H
89 #include <time.h>
90 #endif
91 #if HAVE_SYS_MMAN_H
92 #include <sys/mman.h>
93 #endif
94
95 #define BUFFERED_INDICES 1024
96
97 static jlog_file *__jlog_open_writer(jlog_ctx *ctx);
98 static int __jlog_close_writer(jlog_ctx *ctx);
99 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log);
100 static int __jlog_close_reader(jlog_ctx *ctx);
101 static int __jlog_close_checkpoint(jlog_ctx *ctx);
102 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log);
103 static int __jlog_close_indexer(jlog_ctx *ctx);
104 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *c);
105 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags);
106 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log);
107 static int __jlog_munmap_reader(jlog_ctx *ctx);
108
109 int jlog_snprint_logid(char *b, int n, const jlog_id *id) {
110   return snprintf(b, n, "%08x:%08x", id->log, id->marker);
111 }
112
113 int jlog_repair_datafile(jlog_ctx *ctx, u_int32_t log)
114 {
115   jlog_message_header hdr;
116   char *this, *next, *afternext = NULL, *mmap_end;
117   int i, invalid_count = 0;
118   struct {
119     off_t start, end;
120   } *invalid = NULL;
121   off_t orig_len, src, dst, len;
122
123 #define TAG_INVALID(s, e) do { \
124   if (invalid_count) \
125     invalid = realloc(invalid, (invalid_count + 1) * sizeof(*invalid)); \
126   else \
127     invalid = malloc(sizeof(*invalid)); \
128   invalid[invalid_count].start = s - (char *)ctx->mmap_base; \
129   invalid[invalid_count].end = e - (char *)ctx->mmap_base; \
130   invalid_count++; \
131 } while (0)
132
133   ctx->last_error = JLOG_ERR_SUCCESS;
134
135   /* we want the reader's open logic because this runs in the read path
136    * the underlying fds are always RDWR anyway */
137   __jlog_open_reader(ctx, log);
138   if (!ctx->data) {
139     ctx->last_error = JLOG_ERR_FILE_OPEN;
140     ctx->last_errno = errno;
141     return -1;
142   }
143   if (!jlog_file_lock(ctx->data)) {
144     ctx->last_error = JLOG_ERR_LOCK;
145     ctx->last_errno = errno;
146     return -1;
147   }
148   if (__jlog_mmap_reader(ctx, log) != 0)
149     SYS_FAIL(JLOG_ERR_FILE_READ);
150
151   orig_len = ctx->mmap_len;
152   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
153   /* these values will cause us to fall right into the error clause and
154    * start searching for a valid header from offset 0 */
155   this = (char*)ctx->mmap_base - sizeof(hdr);
156   hdr.reserved = 0;
157   hdr.mlen = 0;
158
159   while (this + sizeof(hdr) <= mmap_end) {
160     next = this + sizeof(hdr) + hdr.mlen;
161     if (next <= (char *)ctx->mmap_base) goto error;
162     if (next == mmap_end) {
163       this = next;
164       break;
165     }
166     if (next + sizeof(hdr) > mmap_end) goto error;
167     memcpy(&hdr, next, sizeof(hdr));
168     if (hdr.reserved != 0) goto error;
169     this = next;
170     continue;
171   error:
172     for (next = this + sizeof(hdr); next + sizeof(hdr) <= mmap_end; next++) {
173       if (!next[0] && !next[1] && !next[2] && !next[3]) {
174         memcpy(&hdr, next, sizeof(hdr));
175         afternext = next + sizeof(hdr) + hdr.mlen;
176         if (afternext <= (char *)ctx->mmap_base) continue;
177         if (afternext == mmap_end) break;
178         if (afternext + sizeof(hdr) > mmap_end) continue;
179         memcpy(&hdr, afternext, sizeof(hdr));
180         if (hdr.reserved == 0) break;
181       }
182     }
183     /* correct for while loop entry condition */
184     if (this < (char *)ctx->mmap_base) this = ctx->mmap_base;
185     if (next + sizeof(hdr) > mmap_end) break;
186     if (next > this) TAG_INVALID(this, next);
187     this = afternext;
188   }
189   if (this != mmap_end) TAG_INVALID(this, mmap_end);
190
191 #undef TAG_INVALID
192
193 #define MOVE_SEGMENT do { \
194   char cpbuff[4096]; \
195   off_t chunk; \
196   while(len > 0) { \
197     chunk = len; \
198     if (chunk > sizeof(cpbuff)) chunk = sizeof(cpbuff); \
199     if (!jlog_file_pread(ctx->data, &cpbuff, chunk, src)) \
200       SYS_FAIL(JLOG_ERR_FILE_READ); \
201     if (!jlog_file_pwrite(ctx->data, &cpbuff, chunk, dst)) \
202       SYS_FAIL(JLOG_ERR_FILE_WRITE); \
203     src += chunk; \
204     dst += chunk; \
205     len -= chunk; \
206   } \
207 } while (0)
208
209   if (invalid_count > 0) {
210     __jlog_munmap_reader(ctx);
211     src = 0;
212     dst = invalid[0].start;
213     for (i = 0; i < invalid_count - 1; ) {
214       src = invalid[i].end;
215       len = invalid[++i].start - src;
216       MOVE_SEGMENT;
217     }
218     src = invalid[invalid_count - 1].end;
219     len = orig_len - src;
220     if (len > 0) MOVE_SEGMENT;
221     if (!jlog_file_truncate(ctx->data, dst))
222       SYS_FAIL(JLOG_ERR_FILE_WRITE);
223   }
224
225 #undef MOVE_SEGMENT
226
227 finish:
228   jlog_file_unlock(ctx->data);
229   if (invalid) free(invalid);
230   if (ctx->last_error != JLOG_ERR_SUCCESS) return -1;
231   return invalid_count;
232 }
233
234 int jlog_inspect_datafile(jlog_ctx *ctx, u_int32_t log)
235 {
236   jlog_message_header hdr;
237   char *this, *next, *mmap_end;
238   int i;
239   time_t timet;
240   struct tm tm;
241   char tbuff[128];
242
243   ctx->last_error = JLOG_ERR_SUCCESS;
244
245   __jlog_open_reader(ctx, log);
246   if (!ctx->data)
247     SYS_FAIL(JLOG_ERR_FILE_OPEN);
248   if (__jlog_mmap_reader(ctx, log) != 0)
249     SYS_FAIL(JLOG_ERR_FILE_READ);
250
251   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
252   this = ctx->mmap_base;
253   i = 0;
254   while (this + sizeof(hdr) <= mmap_end) {
255     memcpy(&hdr, this, sizeof(hdr));
256     i++;
257     if (hdr.reserved != 0) {
258       fprintf(stderr, "Message %d at [%ld] has invalid reserved value %u\n",
259               i, (long int)(this - (char *)ctx->mmap_base), hdr.reserved);
260       return 1;
261     }
262
263     fprintf(stderr, "Message %d at [%ld] of (%lu+%u)", i,
264             (long int)(this - (char *)ctx->mmap_base), sizeof(hdr), hdr.mlen);
265
266     next = this + sizeof(hdr) + hdr.mlen;
267     if (next <= (char *)ctx->mmap_base) {
268       fprintf(stderr, " WRAPPED TO NEGATIVE OFFSET!\n");
269       return 1;
270     }
271     if (next > mmap_end) {
272       fprintf(stderr, " OFF THE END!\n");
273       return 1;
274     }
275
276     timet = hdr.tv_sec;
277     localtime_r(&timet, &tm);
278     strftime(tbuff, sizeof(tbuff), "%c", &tm);
279     fprintf(stderr, "\n\ttime: %s\n\tmlen: %u\n", tbuff, hdr.mlen);
280     this = next;
281   }
282   if (this < mmap_end) {
283     fprintf(stderr, "%ld bytes of junk at the end\n",
284             (long int)(mmap_end - this));
285     return 1;
286   }
287
288   return 0;
289 finish:
290   return -1;
291 }
292
293 int jlog_idx_details(jlog_ctx *ctx, u_int32_t log,
294                      u_int32_t *marker, int *closed)
295 {
296   off_t index_len;
297   u_int64_t index;
298
299   __jlog_open_indexer(ctx, log);
300   if (!ctx->index)
301     SYS_FAIL(JLOG_ERR_IDX_OPEN);
302   if ((index_len = jlog_file_size(ctx->index)) == -1)
303     SYS_FAIL(JLOG_ERR_IDX_SEEK);
304   if (index_len % sizeof(u_int64_t))
305     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
306   if (index_len > sizeof(u_int64_t)) {
307     if (!jlog_file_pread(ctx->index, &index, sizeof(u_int64_t),
308                          index_len - sizeof(u_int64_t)))
309     {
310       SYS_FAIL(JLOG_ERR_IDX_READ);
311     }
312     if (index) {
313       *marker = index_len / sizeof(u_int64_t);
314       *closed = 0;
315     } else {
316       *marker = (index_len / sizeof(u_int64_t)) - 1;
317       *closed = 1;
318     }
319   } else {
320     *marker = index_len / sizeof(u_int64_t);
321     *closed = 0;
322   }
323
324   return 0;
325 finish:
326   return -1;
327 }
328
329 static int __jlog_unlink_datafile(jlog_ctx *ctx, u_int32_t log) {
330   char file[MAXPATHLEN];
331
332   if(ctx->current_log == log) {
333     __jlog_close_reader(ctx);
334     __jlog_close_indexer(ctx);
335   }
336
337   STRSETDATAFILE(ctx, file, log);
338 #ifdef DEBUG
339   fprintf(stderr, "unlinking %s\n", file);
340 #endif
341   unlink(file);
342
343   strcat(file, INDEX_EXT);
344 #ifdef DEBUG
345   fprintf(stderr, "unlinking %s\n", file);
346 #endif
347   unlink(file);
348   return 0;
349 }
350
351 static int __jlog_open_metastore(jlog_ctx *ctx)
352 {
353   char file[MAXPATHLEN];
354   int len;
355
356 #ifdef DEBUG
357   fprintf(stderr, "__jlog_open_metastore\n");
358 #endif
359   strcpy(file, ctx->path);
360   len = strlen(ctx->path);
361   file[len++] = IFS_CH;
362   strcpy(&file[len], "metastore");
363
364   ctx->metastore = jlog_file_open(file, O_CREAT, ctx->file_mode);
365
366   if (!ctx->metastore) {
367     ctx->last_errno = errno;
368     ctx->last_error = JLOG_ERR_CREATE_META;
369     return -1;
370   }
371
372   return 0;
373 }
374
375 /* exported */
376 int __jlog_pending_readers(jlog_ctx *ctx, u_int32_t log) {
377   int readers;
378   DIR *dir;
379   struct dirent *ent;
380   char file[MAXPATHLEN];
381   int len;
382   jlog_id id;
383
384   readers = 0;
385
386   dir = opendir(ctx->path);
387   if (!dir) return -1;
388  
389   strcpy(file, ctx->path);
390   len = strlen(ctx->path);
391   file[len++] = IFS_CH;
392   file[len] = '\0';
393
394   while ((ent = readdir(dir))) {
395     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
396       jlog_file *cp;
397
398       strcpy(file + len, ent->d_name);
399 #ifdef DEBUG
400       fprintf(stderr, "Checking if %s needs %s...\n", ent->d_name, ctx->path);
401 #endif
402       if ((cp = jlog_file_open(file, 0, ctx->file_mode))) {
403         if (jlog_file_lock(cp)) {
404           jlog_file_pread(cp, &id, sizeof(id), 0);
405 #ifdef DEBUG
406           fprintf(stderr, "\t%u <= %u (pending reader)\n", id.log, log);
407 #endif
408           if (id.log <= log) {
409             readers++;
410           }
411           jlog_file_unlock(cp);
412         }
413         jlog_file_close(cp);
414       }
415     }
416   }
417   closedir(dir);
418   return readers;
419 }
420 struct _jlog_subs {
421   char **subs;
422   int used;
423   int allocd;
424 };
425
426 int jlog_ctx_list_subscribers_dispose(jlog_ctx *ctx, char **subs) {
427   char *s;
428   int i = 0;
429   if(subs) {
430     while((s = subs[i++]) != NULL) free(s);
431     free(subs);
432   }
433   return 0;
434 }
435
436 int jlog_ctx_list_subscribers(jlog_ctx *ctx, char ***subs) {
437   struct _jlog_subs js = { NULL, 0, 0 };
438   DIR *dir;
439   struct dirent *ent;
440   unsigned char file[MAXPATHLEN];
441   char *p;
442   int len;
443
444   js.subs = calloc(16, sizeof(char *));
445   js.allocd = 16;
446
447   dir = opendir(ctx->path);
448   if (!dir) return -1;
449   while ((ent = readdir(dir))) {
450     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
451
452       for (len = 0, p = ent->d_name + 3; *p;) {
453         unsigned char c;
454         int i;
455
456         for (c = 0, i = 0; i < 16; i++) {
457           if (__jlog_hexchars[i] == *p) {
458             c = i << 4;
459             break;
460           }
461         }
462         p++;
463         for (i = 0; i < 16; i++) {
464           if (__jlog_hexchars[i] == *p) {
465             c |= i;
466             break;
467           }
468         }
469         p++;
470         file[len++] = c;
471       }
472       file[len] = '\0';
473
474       js.subs[js.used++] = strdup((char *)file);
475       if(js.used == js.allocd) {
476         js.allocd *= 2;
477         js.subs = realloc(js.subs, js.allocd*sizeof(char *));
478       }
479       js.subs[js.used] = NULL;
480     }
481   }
482   closedir(dir);
483   *subs = js.subs;
484   return js.used;
485 }
486
487 static int __jlog_save_metastore(jlog_ctx *ctx, int ilocked)
488 {
489   struct _jlog_meta_info info;
490 #ifdef DEBUG
491   fprintf(stderr, "__jlog_save_metastore\n");
492 #endif
493
494   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
495     return -1;
496   }
497
498   info.storage_log = ctx->storage.log;
499   info.unit_limit = ctx->unit_limit;
500   info.safety = ctx->safety;
501
502   if (!jlog_file_pwrite(ctx->metastore, &info, sizeof(info), 0)) {
503     if (!ilocked) jlog_file_unlock(ctx->metastore);
504     return -1;
505   }
506   if (ctx->safety == JLOG_SAFE) {
507     jlog_file_sync(ctx->metastore);
508   }
509
510   if (!ilocked) jlog_file_unlock(ctx->metastore);
511   return 0;
512 }
513
514 static int __jlog_restore_metastore(jlog_ctx *ctx, int ilocked)
515 {
516   struct _jlog_meta_info info;
517 #ifdef DEBUG
518   fprintf(stderr, "__jlog_restore_metastore\n");
519 #endif
520
521   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
522     return -1;
523   }
524
525   if (!jlog_file_pread(ctx->metastore, &info, sizeof(info), 0)) {
526     if (!ilocked) jlog_file_unlock(ctx->metastore);
527     return -1;
528   }
529
530   if (!ilocked) jlog_file_unlock(ctx->metastore);
531
532   ctx->storage.log = info.storage_log;
533   ctx->unit_limit = info.unit_limit;
534   ctx->safety = info.safety;
535
536   return 0;
537 }
538
539 int jlog_get_checkpoint(jlog_ctx *ctx, const char *s, jlog_id *id) {
540   jlog_file *f;
541   int rv = -1;
542
543   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
544     if(!ctx->checkpoint) {
545       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
546     }
547     f = ctx->checkpoint;
548   } else
549     f = __jlog_open_named_checkpoint(ctx, s, 0);
550
551   if (f) {
552     if (jlog_file_lock(f)) {
553       if (jlog_file_pread(f, id, sizeof(*id), 0)) rv = 0;
554       jlog_file_unlock(f);
555     }
556   }
557   if (f && f != ctx->checkpoint) jlog_file_close(f);
558   return rv;
559 }
560
561 static int __jlog_set_checkpoint(jlog_ctx *ctx, const char *s, const jlog_id *id)
562 {
563   jlog_file *f;
564   int rv = -1;
565   jlog_id old_id;
566   u_int32_t log;
567
568   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
569     if(!ctx->checkpoint) {
570       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
571     }
572     f = ctx->checkpoint;
573   } else
574     f = __jlog_open_named_checkpoint(ctx, s, 0);
575
576   if(!f) return -1;
577   if (!jlog_file_lock(f))
578     goto failset;
579
580   if (jlog_file_size(f) == 0) {
581     /* we're setting it for the first time, no segments were pending on it */
582     old_id.log = id->log;
583   } else {
584     if (!jlog_file_pread(f, &old_id, sizeof(old_id), 0))
585       goto failset;
586   }
587   if (!jlog_file_pwrite(f, id, sizeof(*id), 0))
588     goto failset;
589   if (ctx->safety == JLOG_SAFE) {
590     jlog_file_sync(f);
591   }
592   jlog_file_unlock(f);
593   rv = 0;
594
595   for (log = old_id.log; log < id->log; log++) {
596     if (__jlog_pending_readers(ctx, log) == 0) {
597       __jlog_unlink_datafile(ctx, log);
598     }
599   }
600
601  failset:
602   if (f && f != ctx->checkpoint) jlog_file_close(f);
603   return rv;
604 }
605
606 static int __jlog_close_metastore(jlog_ctx *ctx) {
607   if (ctx->metastore) {
608     jlog_file_close(ctx->metastore);
609     ctx->metastore = NULL;
610   }
611   return 0;
612 }
613
614 /* path is assumed to be MAXPATHLEN */
615 static char *compute_checkpoint_filename(jlog_ctx *ctx, const char *subscriber, char *name)
616 {
617   const char *sub;
618   int len;
619
620   /* build checkpoint filename */
621   len = strlen(ctx->path);
622   strcpy(name, ctx->path);
623   name[len++] = IFS_CH;
624   name[len++] = 'c';
625   name[len++] = 'p';
626   name[len++] = '.';
627   for (sub = subscriber; *sub; ) {
628     name[len++] = __jlog_hexchars[((*sub & 0xf0) >> 4)];
629     name[len++] = __jlog_hexchars[(*sub & 0x0f)];
630     sub++;
631   }
632   name[len++] = '\0';
633
634 #ifdef DEBUG
635   fprintf(stderr, "checkpoint %s filename is %s\n", subscriber, name);
636 #endif
637   return name;
638 }
639
640 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags)
641 {
642   char name[MAXPATHLEN];
643   compute_checkpoint_filename(ctx, cpname, name);
644   return jlog_file_open(name, flags, ctx->file_mode);
645 }
646
647 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log) {
648   char file[MAXPATHLEN];
649
650   if(ctx->current_log != log) {
651     __jlog_close_reader(ctx);
652     __jlog_close_indexer(ctx);
653   }
654   if(ctx->data) {
655     return ctx->data;
656   }
657   STRSETDATAFILE(ctx, file, log);
658 #ifdef DEBUG
659   fprintf(stderr, "opening log file[ro]: '%s'\n", file);
660 #endif
661   ctx->data = jlog_file_open(file, 0, ctx->file_mode);
662   ctx->current_log = log;
663   return ctx->data;
664 }
665
666 static int __jlog_munmap_reader(jlog_ctx *ctx) {
667   if(ctx->mmap_base) {
668     munmap(ctx->mmap_base, ctx->mmap_len);
669     ctx->mmap_base = NULL;
670     ctx->mmap_len = 0;
671   }
672   return 0;
673 }
674
675 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log) {
676   if(ctx->current_log == log && ctx->mmap_base) return 0;
677   __jlog_open_reader(ctx, log);
678   if(!ctx->data)
679     return -1;
680   if (!jlog_file_map_read(ctx->data, &ctx->mmap_base, &ctx->mmap_len)) {
681     ctx->mmap_base = NULL;
682     ctx->last_error = JLOG_ERR_FILE_READ;
683     ctx->last_errno = errno;
684     return -1;
685   }
686   return 0;
687 }
688
689 static jlog_file *__jlog_open_writer(jlog_ctx *ctx) {
690   char file[MAXPATHLEN];
691
692   if(ctx->data) {
693     /* Still open */
694     return ctx->data;
695   }
696
697   if(!jlog_file_lock(ctx->metastore))
698     SYS_FAIL(JLOG_ERR_LOCK);
699   if(__jlog_restore_metastore(ctx, 1))
700     SYS_FAIL(JLOG_ERR_META_OPEN);
701   STRSETDATAFILE(ctx, file, ctx->storage.log);
702 #ifdef DEBUG
703   fprintf(stderr, "opening log file[rw]: '%s'\n", file);
704 #endif
705   ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode);
706  finish:
707   jlog_file_unlock(ctx->metastore);
708   return ctx->data;
709 }
710
711 static int __jlog_close_writer(jlog_ctx *ctx) {
712   if (ctx->data) {
713     jlog_file_close(ctx->data);
714     ctx->data = NULL;
715   }
716   return 0;
717 }
718
719 static int __jlog_close_reader(jlog_ctx *ctx) {
720   __jlog_munmap_reader(ctx);
721   if (ctx->data) {
722     jlog_file_close(ctx->data);
723     ctx->data = NULL;
724   }
725   return 0;
726 }
727
728 static int __jlog_close_checkpoint(jlog_ctx *ctx) {
729   if (ctx->checkpoint) {
730     jlog_file_close(ctx->checkpoint);
731     ctx->checkpoint = NULL;
732   }
733   return 0;
734 }
735
736 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log) {
737   char file[MAXPATHLEN];
738
739   if(ctx->current_log != log) {
740     __jlog_close_reader(ctx);
741     __jlog_close_indexer(ctx);
742   }
743   if(ctx->index) {
744     return ctx->index;
745   }
746   STRSETDATAFILE(ctx, file, log);
747   strcat(file, INDEX_EXT);
748 #ifdef DEBUG
749   fprintf(stderr, "opening index file: '%s'\n", idx);
750 #endif
751   ctx->index = jlog_file_open(file, O_CREAT, ctx->file_mode);
752   ctx->current_log = log;
753   return ctx->index;
754 }
755
756 static int __jlog_close_indexer(jlog_ctx *ctx) {
757   if (ctx->index) {
758     jlog_file_close(ctx->index);
759     ctx->index = NULL;
760   }
761   return 0;
762 }
763
764 static int
765 ___jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last,
766                      int *closed) {
767   jlog_message_header logmhdr;
768   int i, second_try = 0;
769   off_t index_off, data_off, data_len;
770   u_int64_t index;
771   u_int64_t indices[BUFFERED_INDICES];
772
773   ctx->last_error = JLOG_ERR_SUCCESS;
774   if(closed) *closed = 0;
775
776   __jlog_open_reader(ctx, log);
777   if (!ctx->data) {
778     ctx->last_error = JLOG_ERR_FILE_OPEN;
779     ctx->last_errno = errno;
780     return -1;
781   }
782
783 #define RESTART do { \
784   if (second_try == 0) { \
785     jlog_file_truncate(ctx->index, 0); \
786     jlog_file_unlock(ctx->index); \
787     second_try = 1; \
788     ctx->last_error = JLOG_ERR_SUCCESS; \
789     goto restart; \
790   } \
791   SYS_FAIL(JLOG_ERR_IDX_CORRUPT); \
792 } while (0)
793
794 restart:
795   __jlog_open_indexer(ctx, log);
796   if (!ctx->index) {
797     ctx->last_error = JLOG_ERR_IDX_OPEN;
798     ctx->last_errno = errno;
799     return -1;
800   }
801   if (!jlog_file_lock(ctx->index)) {
802     ctx->last_error = JLOG_ERR_LOCK;
803     ctx->last_errno = errno;
804     return -1;
805   }
806
807   data_off = 0;
808   if ((data_len = jlog_file_size(ctx->data)) == -1)
809     SYS_FAIL(JLOG_ERR_FILE_SEEK);
810   if ((index_off = jlog_file_size(ctx->index)) == -1)
811     SYS_FAIL(JLOG_ERR_IDX_SEEK);
812
813   if (index_off % sizeof(u_int64_t)) {
814 #ifdef DEBUG
815     fprintf(stderr, "corrupt index [%llu]\n", index_off);
816 #endif
817     RESTART;
818   }
819
820   if (index_off > sizeof(u_int64_t)) {
821     if (!jlog_file_pread(ctx->index, &index, sizeof(index),
822                          index_off - sizeof(u_int64_t)))
823     {
824       SYS_FAIL(JLOG_ERR_IDX_READ);
825     }
826     if (index == 0) {
827       /* This log file has been "closed" */
828 #ifdef DEBUG
829       fprintf(stderr, "index closed\n");
830 #endif
831       if(last) {
832         last->log = log;
833         last->marker = (index_off / sizeof(u_int64_t)) - 1;
834       }
835       if(closed) *closed = 1;
836       goto finish;
837     } else {
838       if (index > data_len) {
839 #ifdef DEBUG
840         fprintf(stderr, "index told me to seek somehwere I can't\n");
841 #endif
842         RESTART;
843       }
844       data_off = index;
845     }
846   }
847
848   if (index_off > 0) {
849     /* We are adding onto a partial index so we must advance a record */
850     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
851       SYS_FAIL(JLOG_ERR_FILE_READ);
852     if ((data_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
853       RESTART;
854   }
855
856   i = 0;
857   while (data_off + sizeof(logmhdr) <= data_len) {
858     off_t next_off = data_off;
859
860     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
861       SYS_FAIL(JLOG_ERR_FILE_READ);
862     if (logmhdr.reserved != 0)
863       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
864     if ((next_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
865       break;
866
867     /* Write our new index offset */
868     indices[i++] = data_off;
869     if(i >= BUFFERED_INDICES) {
870 #ifdef DEBUG
871       fprintf(stderr, "writing %i offsets\n", i);
872 #endif
873       if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
874         RESTART;
875       index_off += i * sizeof(u_int64_t);
876       i = 0;
877     }
878     data_off = next_off;
879   }
880   if(i > 0) {
881 #ifdef DEBUG
882     fprintf(stderr, "writing %i offsets\n", i);
883 #endif
884     if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
885       RESTART;
886     index_off += i * sizeof(u_int64_t);
887   }
888   if(last) {
889     last->log = log;
890     last->marker = index_off / sizeof(u_int64_t);
891   }
892   if(log < ctx->storage.log) {
893     if (data_off != data_len) {
894 #ifdef DEBUG
895       fprintf(stderr, "closing index, but %llu != %llu\n", data_off, data_len);
896 #endif
897       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
898     }
899     /* Special case: if we are closing, we next write a '0'
900      * we can't write the closing marker if the data segment had no records
901      * in it, since it will be confused with an index to offset 0 by the
902      * next reader; this only happens when segments are repaired */
903     if (index_off) {
904       index = 0;
905       if (!jlog_file_pwrite(ctx->index, &index, sizeof(u_int64_t), index_off))
906         RESTART;
907     }
908     if(closed) *closed = 1;
909   }
910 #undef RESTART
911
912 finish:
913   jlog_file_unlock(ctx->index);
914 #ifdef DEBUG
915   fprintf(stderr, "index is %s\n", closed?(*closed?"closed":"open"):"unknown");
916 #endif
917   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
918   return -1;
919 }
920
921 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *closed) {
922   int attempts, rv = -1;
923   for(attempts=0; attempts<4; attempts++) {
924     rv = ___jlog_resync_index(ctx, log, last, closed);
925     if(ctx->last_error == JLOG_ERR_SUCCESS) break;
926     if(ctx->last_error == JLOG_ERR_FILE_OPEN ||
927        ctx->last_error == JLOG_ERR_IDX_OPEN) break;
928
929     /* We can't fix the file if someone may write to it again */
930     if(log >= ctx->storage.log) break;
931
932     jlog_file_lock(ctx->index);
933     /* it doesn't really matter what jlog_repair_datafile returns
934      * we'll keep retrying anyway */
935     jlog_repair_datafile(ctx, log);
936     jlog_file_truncate(ctx->index, 0);
937     jlog_file_unlock(ctx->index);
938   }
939   return rv;
940 }
941
942 jlog_ctx *jlog_new(const char *path) {
943   jlog_ctx *ctx;
944   ctx = calloc(1, sizeof(*ctx));
945   ctx->unit_limit = DEFAULT_UNIT_LIMIT;
946   ctx->file_mode = DEFAULT_FILE_MODE;
947   ctx->safety = DEFAULT_SAFETY;
948   ctx->context_mode = JLOG_NEW;
949   ctx->path = strdup(path);
950   return ctx;
951 }
952
953 void jlog_set_error_func(jlog_ctx *ctx, jlog_error_func Func, void *ptr) {
954   ctx->error_func = Func;
955   ctx->error_ctx = ptr;
956 }
957
958 size_t jlog_raw_size(jlog_ctx *ctx) {
959   DIR *d;
960   struct dirent *de;
961   size_t totalsize = 0;
962   int ferr, len;
963   char filename[MAXPATHLEN];
964
965   d = opendir(ctx->path);
966   if(!d) return 0;
967   len = strlen(ctx->path);
968   memcpy(filename, ctx->path, len);
969   filename[len++] = IFS_CH;
970   while((de = readdir(d)) != NULL) {
971     struct stat sb;
972     strcpy(filename+len, de->d_name);
973     while((ferr = stat(filename, &sb)) == -1 && errno == EINTR);
974     if(ferr == 0 && S_ISREG(sb.st_mode)) totalsize += sb.st_size;
975   }
976   closedir(d);
977   return totalsize;
978 }
979
980 const char *jlog_ctx_err_string(jlog_ctx *ctx) {
981   switch (ctx->last_error) {
982 #define MSG_O_MATIC(x)  case x: return #x;
983     MSG_O_MATIC( JLOG_ERR_SUCCESS);
984     MSG_O_MATIC( JLOG_ERR_ILLEGAL_INIT);
985     MSG_O_MATIC( JLOG_ERR_ILLEGAL_OPEN);
986     MSG_O_MATIC( JLOG_ERR_OPEN);
987     MSG_O_MATIC( JLOG_ERR_NOTDIR);
988     MSG_O_MATIC( JLOG_ERR_CREATE_PATHLEN);
989     MSG_O_MATIC( JLOG_ERR_CREATE_EXISTS);
990     MSG_O_MATIC( JLOG_ERR_CREATE_MKDIR);
991     MSG_O_MATIC( JLOG_ERR_CREATE_META);
992     MSG_O_MATIC( JLOG_ERR_LOCK);
993     MSG_O_MATIC( JLOG_ERR_IDX_OPEN);
994     MSG_O_MATIC( JLOG_ERR_IDX_SEEK);
995     MSG_O_MATIC( JLOG_ERR_IDX_CORRUPT);
996     MSG_O_MATIC( JLOG_ERR_IDX_WRITE);
997     MSG_O_MATIC( JLOG_ERR_IDX_READ);
998     MSG_O_MATIC( JLOG_ERR_FILE_OPEN);
999     MSG_O_MATIC( JLOG_ERR_FILE_SEEK);
1000     MSG_O_MATIC( JLOG_ERR_FILE_CORRUPT);
1001     MSG_O_MATIC( JLOG_ERR_FILE_READ);
1002     MSG_O_MATIC( JLOG_ERR_FILE_WRITE);
1003     MSG_O_MATIC( JLOG_ERR_META_OPEN);
1004     MSG_O_MATIC( JLOG_ERR_ILLEGAL_WRITE);
1005     MSG_O_MATIC( JLOG_ERR_ILLEGAL_CHECKPOINT);
1006     MSG_O_MATIC( JLOG_ERR_INVALID_SUBSCRIBER);
1007     MSG_O_MATIC( JLOG_ERR_ILLEGAL_LOGID);
1008     MSG_O_MATIC( JLOG_ERR_SUBSCRIBER_EXISTS);
1009     MSG_O_MATIC( JLOG_ERR_CHECKPOINT);
1010     MSG_O_MATIC( JLOG_ERR_NOT_SUPPORTED);
1011     default: return "Unknown";
1012   }
1013 }
1014
1015 int jlog_ctx_err(jlog_ctx *ctx) {
1016   return ctx->last_error;
1017 }
1018
1019 int jlog_ctx_errno(jlog_ctx *ctx) {
1020   return ctx->last_errno;
1021 }
1022
1023 int jlog_ctx_alter_safety(jlog_ctx *ctx, jlog_safety safety) {
1024   if(ctx->context_mode != JLOG_NEW) return -1;
1025   ctx->safety = safety;
1026   return 0;
1027 }
1028 int jlog_ctx_alter_journal_size(jlog_ctx *ctx, size_t size) {
1029   if(ctx->context_mode != JLOG_NEW) return -1;
1030   ctx->unit_limit = size;
1031   return 0;
1032 }
1033 int jlog_ctx_alter_mode(jlog_ctx *ctx, int mode) {
1034   ctx->file_mode = mode;
1035   return 0;
1036 }
1037 int jlog_ctx_open_writer(jlog_ctx *ctx) {
1038   int rv;
1039   struct stat sb;
1040
1041   ctx->last_error = JLOG_ERR_SUCCESS;
1042   if(ctx->context_mode != JLOG_NEW) {
1043     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1044     return -1;
1045   }
1046   ctx->context_mode = JLOG_APPEND;
1047   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1048   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1049   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1050   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1051   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1052  finish:
1053   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1054   ctx->context_mode = JLOG_INVALID;
1055   return -1;
1056 }
1057 int jlog_ctx_open_reader(jlog_ctx *ctx, const char *subscriber) {
1058   int rv;
1059   struct stat sb;
1060   jlog_id dummy;
1061
1062   ctx->last_error = JLOG_ERR_SUCCESS;
1063   if(ctx->context_mode != JLOG_NEW) {
1064     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1065     return -1;
1066   }
1067   ctx->context_mode = JLOG_READ;
1068   ctx->subscriber_name = strdup(subscriber);
1069   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1070   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1071   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1072   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1073   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &dummy))
1074     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1075   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1076  finish:
1077   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1078   ctx->context_mode = JLOG_INVALID;
1079   return -1;
1080 }
1081 int jlog_ctx_init(jlog_ctx *ctx) {
1082   int rv;
1083   struct stat sb;
1084   int dirmode;
1085
1086   ctx->last_error = JLOG_ERR_SUCCESS;
1087   if(strlen(ctx->path) > MAXLOGPATHLEN-1) {
1088     ctx->last_error = JLOG_ERR_CREATE_PATHLEN;
1089     return -1;
1090   }
1091   if(ctx->context_mode != JLOG_NEW) {
1092     ctx->last_error = JLOG_ERR_ILLEGAL_INIT;
1093     return -1;
1094   }
1095   ctx->context_mode = JLOG_INIT;
1096   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1097   if(rv == 0 || errno != ENOENT) {
1098     SYS_FAIL_EX(JLOG_ERR_CREATE_EXISTS, 0);
1099   }
1100   dirmode = ctx->file_mode;
1101   if(dirmode & 0400) dirmode |= 0100;
1102   if(dirmode & 040) dirmode |= 010;
1103   if(dirmode & 04) dirmode |= 01;
1104   if(mkdir(ctx->path, dirmode) == -1)
1105     SYS_FAIL(JLOG_ERR_CREATE_MKDIR);
1106   chmod(ctx->path, dirmode);
1107   /* Setup our initial state and store our instance metadata */
1108   if(__jlog_open_metastore(ctx) != 0)
1109     SYS_FAIL(JLOG_ERR_CREATE_META);
1110   if(__jlog_save_metastore(ctx, 0) != 0)
1111     SYS_FAIL(JLOG_ERR_CREATE_META);
1112  finish:
1113   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1114   return -1;
1115 }
1116 int jlog_ctx_close(jlog_ctx *ctx) {
1117   __jlog_close_indexer(ctx);
1118   __jlog_close_reader(ctx);
1119   __jlog_close_metastore(ctx);
1120   __jlog_close_checkpoint(ctx);
1121   if(ctx->subscriber_name) free(ctx->subscriber_name);
1122   if(ctx->path) free(ctx->path);
1123   free(ctx);
1124   return 0;
1125 }
1126
1127 static int __jlog_metastore_atomic_increment(jlog_ctx *ctx) {
1128   u_int32_t saved_storage_log = ctx->storage.log;
1129 #ifdef DEBUG
1130   fprintf(stderr, "atomic increment on %u\n", saved_storage_log);
1131 #endif
1132   if (!jlog_file_lock(ctx->metastore))
1133     SYS_FAIL(JLOG_ERR_LOCK);
1134   if(__jlog_restore_metastore(ctx, 1))
1135     SYS_FAIL(JLOG_ERR_META_OPEN);
1136   if(ctx->storage.log == saved_storage_log) {
1137     /* We're the first ones to it, so we get to increment it */
1138     ctx->storage.log++;
1139     if(__jlog_save_metastore(ctx, 1))
1140       SYS_FAIL(JLOG_ERR_META_OPEN);
1141   }
1142  finish:
1143   jlog_file_unlock(ctx->metastore);
1144   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1145   return -1;
1146 }
1147 int jlog_ctx_write_message(jlog_ctx *ctx, jlog_message *mess, struct timeval *when) {
1148   struct timeval now;
1149   jlog_message_header hdr;
1150   off_t current_offset;
1151
1152   ctx->last_error = JLOG_ERR_SUCCESS;
1153   if(ctx->context_mode != JLOG_APPEND) {
1154     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1155     ctx->last_errno = EPERM;
1156     return -1;
1157   }
1158  begin:
1159   __jlog_open_writer(ctx);
1160   if(!ctx->data) {
1161     ctx->last_error = JLOG_ERR_FILE_OPEN;
1162     ctx->last_errno = errno;
1163     return -1;
1164   }
1165   if (!jlog_file_lock(ctx->data)) {
1166     ctx->last_error = JLOG_ERR_LOCK;
1167     ctx->last_errno = errno;
1168     return -1;
1169   }
1170
1171   if ((current_offset = jlog_file_size(ctx->data)) == -1)
1172     SYS_FAIL(JLOG_ERR_FILE_SEEK);
1173   if(ctx->unit_limit <= current_offset) {
1174     jlog_file_unlock(ctx->data);
1175     __jlog_close_writer(ctx);
1176     __jlog_metastore_atomic_increment(ctx);
1177     goto begin;
1178   }
1179
1180   hdr.reserved = 0;
1181   if (when) {
1182     hdr.tv_sec = when->tv_sec;
1183     hdr.tv_usec = when->tv_usec;
1184   } else {
1185     gettimeofday(&now, NULL);
1186     hdr.tv_sec = now.tv_sec;
1187     hdr.tv_usec = now.tv_usec;
1188   }
1189   hdr.mlen = mess->mess_len;
1190   if (!jlog_file_pwrite(ctx->data, &hdr, sizeof(hdr), current_offset))
1191     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1192   current_offset += sizeof(hdr);
1193   if (!jlog_file_pwrite(ctx->data, mess->mess, mess->mess_len, current_offset))
1194     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1195   current_offset += mess->mess_len;
1196
1197   if(ctx->unit_limit <= current_offset) {
1198     jlog_file_unlock(ctx->data);
1199     __jlog_close_writer(ctx);
1200     __jlog_metastore_atomic_increment(ctx);
1201     return 0;
1202   }
1203  finish:
1204   jlog_file_unlock(ctx->data);
1205   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1206   return -1;
1207 }
1208 int jlog_ctx_read_checkpoint(jlog_ctx *ctx, const jlog_id *chkpt) {
1209   ctx->last_error = JLOG_ERR_SUCCESS;
1210  
1211   if(ctx->context_mode != JLOG_READ) {
1212     ctx->last_error = JLOG_ERR_ILLEGAL_CHECKPOINT;
1213     ctx->last_errno = EPERM;
1214     return -1;
1215   }
1216   if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, chkpt) != 0) {
1217     ctx->last_error = JLOG_ERR_CHECKPOINT;
1218     ctx->last_errno = 0;
1219     return -1;
1220   }
1221   return 0;
1222 }
1223
1224 int jlog_ctx_remove_subscriber(jlog_ctx *ctx, const char *s) {
1225   char name[MAXPATHLEN];
1226   int rv;
1227
1228   compute_checkpoint_filename(ctx, s, name);
1229   rv = unlink(name);
1230
1231   if (rv == 0) {
1232     ctx->last_error = JLOG_ERR_SUCCESS;
1233     return 1;
1234   }
1235   if (errno == ENOENT) {
1236     ctx->last_error = JLOG_ERR_INVALID_SUBSCRIBER;
1237     return 0;
1238   }
1239   return -1;
1240 }
1241
1242 int jlog_ctx_add_subscriber(jlog_ctx *ctx, const char *s, jlog_position whence) {
1243   jlog_id chkpt;
1244   jlog_file *jchkpt;
1245   ctx->last_error = JLOG_ERR_SUCCESS;
1246
1247   jchkpt = __jlog_open_named_checkpoint(ctx, s, O_CREAT|O_EXCL);
1248   if(!jchkpt) {
1249     ctx->last_error = JLOG_ERR_SUBSCRIBER_EXISTS;
1250     ctx->last_errno = EEXIST;
1251     return -1;
1252   }
1253   jlog_file_close(jchkpt);
1254  
1255   if(whence == JLOG_BEGIN) {
1256     memset(&chkpt, 0, sizeof(chkpt));
1257     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0) {
1258       ctx->last_error = JLOG_ERR_CHECKPOINT;
1259       ctx->last_errno = 0;
1260       return -1;
1261     }
1262     return 0;
1263   }
1264   ctx->last_error = JLOG_ERR_NOT_SUPPORTED;
1265   return -1;
1266 }
1267
1268 int jlog_ctx_write(jlog_ctx *ctx, const void *data, size_t len) {
1269   jlog_message m;
1270   m.mess = (void *)data;
1271   m.mess_len = len;
1272   return jlog_ctx_write_message(ctx, &m, NULL);
1273 }
1274
1275 static int __jlog_find_first_log_after(jlog_ctx *ctx, jlog_id *chkpt,
1276                                 jlog_id *start, jlog_id *finish) {
1277   jlog_id last;
1278   int closed;
1279  
1280   memcpy(start, chkpt, sizeof(*chkpt));
1281  attempt:
1282   if(__jlog_resync_index(ctx, start->log, &last, &closed) != 0) {
1283     if(ctx->last_error == JLOG_ERR_FILE_OPEN &&
1284        ctx->last_errno == ENOENT) {
1285       /* That file doesn't exist... bad, but we can fake a recovery by
1286          advancing the next file that does exist */
1287       ctx->last_error = JLOG_ERR_SUCCESS;
1288       if(start->log >= ctx->storage.log) {
1289         /* We don't advance past where people are writing */
1290         memcpy(finish, start, sizeof(*start));
1291         return 0;
1292       }
1293       start->marker = 0;
1294       start->log++;  /* BE SMARTER! */
1295       goto attempt;
1296     }
1297     return -1; /* Just persist resync's error state */
1298   }
1299
1300   /* If someone checkpoints off the end, be nice */
1301   if(last.log == start->log && last.marker < start->marker)
1302     memcpy(start, &last, sizeof(*start));
1303
1304   if(!memcmp(start, &last, sizeof(last)) && closed) {
1305     /* the chkpt is the last of the index and the index is closed...
1306        next please */
1307 #ifdef DEBUG
1308     fprintf(stderr, "checkpoint at end of file... advancing [%08x]\n",
1309             start->log+1);
1310 #endif
1311     if(start->log >= ctx->storage.log) {
1312       /* We don't advance past where people are writing */
1313       memcpy(finish, start, sizeof(*start));
1314       return 0;
1315     }
1316     start->marker = 0;
1317     start->log++;
1318     goto attempt;
1319   }
1320   memcpy(finish, &last, sizeof(last));
1321   return 0;
1322 }
1323 int jlog_ctx_read_message(jlog_ctx *ctx, const jlog_id *id, jlog_message *m) {
1324   off_t index_len;
1325   u_int64_t data_off;
1326
1327   ctx->last_error = JLOG_ERR_SUCCESS;
1328   if (ctx->context_mode != JLOG_READ)
1329     SYS_FAIL(JLOG_ERR_ILLEGAL_WRITE);
1330   if (id->marker < 1)
1331     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1332
1333   __jlog_open_reader(ctx, id->log);
1334   if(!ctx->data)
1335     SYS_FAIL(JLOG_ERR_FILE_OPEN);
1336   __jlog_open_indexer(ctx, id->log);
1337   if(!ctx->index)
1338     SYS_FAIL(JLOG_ERR_IDX_OPEN);
1339
1340   if ((index_len = jlog_file_size(ctx->index)) == -1)
1341     SYS_FAIL(JLOG_ERR_IDX_SEEK);
1342   if (index_len % sizeof(u_int64_t))
1343     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1344   if (id->marker * sizeof(u_int64_t) > index_len)
1345     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1346
1347   if (!jlog_file_pread(ctx->index, &data_off, sizeof(u_int64_t),
1348                        (id->marker - 1) * sizeof(u_int64_t)))
1349   {
1350     SYS_FAIL(JLOG_ERR_IDX_READ);
1351   }
1352   if (data_off == 0 && id->marker != 1) {
1353     if (id->marker * sizeof(u_int64_t) == index_len) {
1354       /* close tag; not a real offset */
1355       SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1356     } else {
1357       /* an offset of 0 in the middle of an index means curruption */
1358       SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1359     }
1360   }
1361
1362   if(__jlog_mmap_reader(ctx, id->log) != 0)
1363     SYS_FAIL(JLOG_ERR_FILE_READ);
1364
1365   if(data_off > ctx->mmap_len - sizeof(jlog_message_header)) {
1366 #ifdef DEBUG
1367     fprintf(stderr, "read idx off end: %llu\n", data_off);
1368 #endif
1369     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1370   }
1371
1372   memcpy(&m->aligned_header, ((u_int8_t *)ctx->mmap_base) + data_off,
1373          sizeof(jlog_message_header));
1374
1375   if(data_off + sizeof(jlog_message_header) + m->aligned_header.mlen > ctx->mmap_len) {
1376 #ifdef DEBUG
1377     fprintf(stderr, "read idx off end: %llu %llu\n", data_off, ctx->mmap_len);
1378 #endif
1379     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1380   }
1381
1382   m->header = &m->aligned_header;
1383   m->mess_len = m->header->mlen;
1384   m->mess = (((u_int8_t *)ctx->mmap_base) + data_off + sizeof(jlog_message_header));
1385
1386  finish:
1387   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1388   if (ctx->last_error == JLOG_ERR_IDX_CORRUPT) {
1389     if (jlog_file_lock(ctx->index)) {
1390       jlog_file_truncate(ctx->index, 0);
1391       jlog_file_unlock(ctx->index);
1392     }
1393   }
1394   return -1;
1395 }
1396 int jlog_ctx_read_interval(jlog_ctx *ctx, jlog_id *start, jlog_id *finish) {
1397   jlog_id chkpt;
1398   int count = 0;
1399
1400   ctx->last_error = JLOG_ERR_SUCCESS;
1401   if(ctx->context_mode != JLOG_READ) {
1402     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1403     ctx->last_errno = EPERM;
1404     return -1;
1405   }
1406
1407   __jlog_restore_metastore(ctx, 0);
1408   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &chkpt))
1409     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1410   if(__jlog_find_first_log_after(ctx, &chkpt, start, finish) != 0)
1411     goto finish; /* Leave whatever error was set in find_first_log_after */
1412   if(start->log != chkpt.log) start->marker = 0;
1413   else start->marker = chkpt.marker;
1414   if(start->log != chkpt.log) {
1415     /* We've advanced our checkpoint, let's not do this work again */
1416     if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, start) != 0)
1417       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1418   }
1419   /* Here 'start' is actually the checkpoint, so we must advance it one.
1420      However, that may not be possible, if there are no messages, so first
1421      make sure finish is bigger */
1422   count = finish->marker - start->marker;
1423   if(finish->marker > start->marker) start->marker++;
1424
1425   /* We need to munmap it, so that we can remap it with more data if needed */
1426   __jlog_munmap_reader(ctx);
1427  finish:
1428   if(ctx->last_error == JLOG_ERR_SUCCESS) return count;
1429   return -1;
1430 }
1431
1432 int jlog_ctx_last_log_id(jlog_ctx *ctx, jlog_id *id) {
1433   ctx->last_error = JLOG_ERR_SUCCESS;
1434   if(ctx->context_mode != JLOG_READ) {
1435     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1436     ctx->last_errno = EPERM;
1437     return -1;
1438   }
1439   if (__jlog_restore_metastore(ctx, 0) != 0) return -1;
1440   ___jlog_resync_index(ctx, ctx->storage.log, id, NULL);
1441   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1442   return -1;
1443 }
1444
1445 /* vim:se ts=2 sw=2 et: */
Note: See TracBrowser for help on using the browser.