root/src/jlog/jlog.c

Revision cc981f5c55100675865b6960135d70148d8585af, 42.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

pull in jlog @ 52

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2008, Message Systems, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name Message Systems, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 /*****************************************************************
34
35   Journaled logging... append only.
36
37       (1) find current file, or allocate a file, extendible and mark
38           it current.
39  
40       (2) Write records to it, records include their size, so
41           a simple inspection can detect and incomplete trailing
42           record.
43    
44       (3) Write append until the file reaches a certain size.
45
46       (4) Allocate a file, extensible.
47
48       (5) RESYNC INDEX on 'finished' file (see reading:3) and postpend
49           an offset '0' to the index.
50    
51       (2) goto (1)
52    
53   Reading journals...
54
55       (1) find oldest checkpoint of all subscribers, remove all older files.
56
57       (2) (file, last_read) = find_checkpoint for this subscriber
58
59       (3) RESYNC INDEX:
60           open record index for file, seek to the end -  off_t.
61           this is the offset of the last noticed record in this file.
62           open file, seek to this point, roll forward writing the index file
63           _do not_ write an offset for the last record unless it is found
64           complete.
65
66       (4) read entries from last_read+1 -> index of record index
67
68 */
69 #include <stdio.h>
70
71 #include "jlog_config.h"
72 #include "jlog_private.h"
73 #if HAVE_UNISTD_H
74 #include <unistd.h>
75 #endif
76 #if HAVE_SYS_TIME_H
77 #include <sys/time.h>
78 #endif
79 #if HAVE_DIRENT_H
80 #include <dirent.h>
81 #endif
82 #if HAVE_FCNTL_H
83 #include <fcntl.h>
84 #endif
85 #if HAVE_ERRNO_H
86 #include <errno.h>
87 #endif
88 #if HAVE_TIME_H
89 #include <time.h>
90 #endif
91 #if HAVE_SYS_MMAN_H
92 #include <sys/mman.h>
93 #endif
94
95 #define BUFFERED_INDICES 1024
96
97 static jlog_file *__jlog_open_writer(jlog_ctx *ctx);
98 static int __jlog_close_writer(jlog_ctx *ctx);
99 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log);
100 static int __jlog_close_reader(jlog_ctx *ctx);
101 static int __jlog_close_checkpoint(jlog_ctx *ctx);
102 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log);
103 static int __jlog_close_indexer(jlog_ctx *ctx);
104 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *c);
105 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags);
106 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log);
107 static int __jlog_munmap_reader(jlog_ctx *ctx);
108
109 int jlog_snprint_logid(char *b, int n, const jlog_id *id) {
110   return snprintf(b, n, "%08x:%08x", id->log, id->marker);
111 }
112
113 int jlog_repair_datafile(jlog_ctx *ctx, u_int32_t log)
114 {
115   jlog_message_header hdr;
116   char *this, *next, *afternext = NULL, *mmap_end;
117   int i, invalid_count = 0;
118   struct {
119     off_t start, end;
120   } *invalid = NULL;
121   off_t orig_len, src, dst, len;
122
123 #define TAG_INVALID(s, e) do { \
124   if (invalid_count) \
125     invalid = realloc(invalid, (invalid_count + 1) * sizeof(*invalid)); \
126   else \
127     invalid = malloc(sizeof(*invalid)); \
128   invalid[invalid_count].start = s - (char *)ctx->mmap_base; \
129   invalid[invalid_count].end = e - (char *)ctx->mmap_base; \
130   invalid_count++; \
131 } while (0)
132
133   ctx->last_error = JLOG_ERR_SUCCESS;
134
135   /* we want the reader's open logic because this runs in the read path
136    * the underlying fds are always RDWR anyway */
137   __jlog_open_reader(ctx, log);
138   if (!ctx->data) {
139     ctx->last_error = JLOG_ERR_FILE_OPEN;
140     ctx->last_errno = errno;
141     return -1;
142   }
143   if (!jlog_file_lock(ctx->data)) {
144     ctx->last_error = JLOG_ERR_LOCK;
145     ctx->last_errno = errno;
146     return -1;
147   }
148   if (__jlog_mmap_reader(ctx, log) != 0)
149     SYS_FAIL(JLOG_ERR_FILE_READ);
150
151   orig_len = ctx->mmap_len;
152   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
153   /* these values will cause us to fall right into the error clause and
154    * start searching for a valid header from offset 0 */
155   this = (char*)ctx->mmap_base - sizeof(hdr);
156   hdr.reserved = 0;
157   hdr.mlen = 0;
158
159   while (this + sizeof(hdr) <= mmap_end) {
160     next = this + sizeof(hdr) + hdr.mlen;
161     if (next <= (char *)ctx->mmap_base) goto error;
162     if (next == mmap_end) {
163       this = next;
164       break;
165     }
166     if (next + sizeof(hdr) > mmap_end) goto error;
167     memcpy(&hdr, next, sizeof(hdr));
168     if (hdr.reserved != 0) goto error;
169     this = next;
170     continue;
171   error:
172     for (next = this + sizeof(hdr); next + sizeof(hdr) <= mmap_end; next++) {
173       if (!next[0] && !next[1] && !next[2] && !next[3]) {
174         memcpy(&hdr, next, sizeof(hdr));
175         afternext = next + sizeof(hdr) + hdr.mlen;
176         if (afternext <= (char *)ctx->mmap_base) continue;
177         if (afternext == mmap_end) break;
178         if (afternext + sizeof(hdr) > mmap_end) continue;
179         memcpy(&hdr, afternext, sizeof(hdr));
180         if (hdr.reserved == 0) break;
181       }
182     }
183     /* correct for while loop entry condition */
184     if (this < (char *)ctx->mmap_base) this = ctx->mmap_base;
185     if (next + sizeof(hdr) > mmap_end) break;
186     if (next > this) TAG_INVALID(this, next);
187     this = afternext;
188   }
189   if (this != mmap_end) TAG_INVALID(this, mmap_end);
190
191 #undef TAG_INVALID
192
193 #define MOVE_SEGMENT do { \
194   char cpbuff[4096]; \
195   off_t chunk; \
196   while(len > 0) { \
197     chunk = len; \
198     if (chunk > sizeof(cpbuff)) chunk = sizeof(cpbuff); \
199     if (!jlog_file_pread(ctx->data, &cpbuff, chunk, src)) \
200       SYS_FAIL(JLOG_ERR_FILE_READ); \
201     if (!jlog_file_pwrite(ctx->data, &cpbuff, chunk, dst)) \
202       SYS_FAIL(JLOG_ERR_FILE_WRITE); \
203     src += chunk; \
204     dst += chunk; \
205     len -= chunk; \
206   } \
207 } while (0)
208
209   if (invalid_count > 0) {
210     __jlog_munmap_reader(ctx);
211     dst = invalid[0].start;
212     for (i = 0; i < invalid_count - 1; ) {
213       src = invalid[i].end;
214       len = invalid[++i].start - src;
215       MOVE_SEGMENT;
216     }
217     src = invalid[invalid_count - 1].end;
218     len = orig_len - src;
219     if (len > 0) MOVE_SEGMENT;
220     if (!jlog_file_truncate(ctx->data, dst))
221       SYS_FAIL(JLOG_ERR_FILE_WRITE);
222   }
223
224 #undef MOVE_SEGMENT
225
226 finish:
227   jlog_file_unlock(ctx->data);
228   if (invalid) free(invalid);
229   if (ctx->last_error != JLOG_ERR_SUCCESS) return -1;
230   return invalid_count;
231 }
232
233 int jlog_inspect_datafile(jlog_ctx *ctx, u_int32_t log)
234 {
235   jlog_message_header hdr;
236   char *this, *next, *mmap_end;
237   int i;
238   time_t timet;
239   struct tm tm;
240   char tbuff[128];
241
242   ctx->last_error = JLOG_ERR_SUCCESS;
243
244   __jlog_open_reader(ctx, log);
245   if (!ctx->data)
246     SYS_FAIL(JLOG_ERR_FILE_OPEN);
247   if (__jlog_mmap_reader(ctx, log) != 0)
248     SYS_FAIL(JLOG_ERR_FILE_READ);
249
250   mmap_end = (char*)ctx->mmap_base + ctx->mmap_len;
251   this = ctx->mmap_base;
252   i = 0;
253   while (this + sizeof(hdr) <= mmap_end) {
254     memcpy(&hdr, this, sizeof(hdr));
255     i++;
256     if (hdr.reserved != 0) {
257       fprintf(stderr, "Message %d at [%ld] has invalid reserved value %u\n",
258               i, (long int)(this - (char *)ctx->mmap_base), hdr.reserved);
259       return 1;
260     }
261
262     fprintf(stderr, "Message %d at [%ld] of (%lu+%u)", i,
263             (long int)(this - (char *)ctx->mmap_base),
264             (long unsigned int)sizeof(hdr), hdr.mlen);
265
266     next = this + sizeof(hdr) + hdr.mlen;
267     if (next <= (char *)ctx->mmap_base) {
268       fprintf(stderr, " WRAPPED TO NEGATIVE OFFSET!\n");
269       return 1;
270     }
271     if (next > mmap_end) {
272       fprintf(stderr, " OFF THE END!\n");
273       return 1;
274     }
275
276     timet = hdr.tv_sec;
277     localtime_r(&timet, &tm);
278     strftime(tbuff, sizeof(tbuff), "%c", &tm);
279     fprintf(stderr, "\n\ttime: %s\n\tmlen: %u\n", tbuff, hdr.mlen);
280     this = next;
281   }
282   if (this < mmap_end) {
283     fprintf(stderr, "%ld bytes of junk at the end\n",
284             (long int)(mmap_end - this));
285     return 1;
286   }
287
288   return 0;
289 finish:
290   return -1;
291 }
292
293 int jlog_idx_details(jlog_ctx *ctx, u_int32_t log,
294                      u_int32_t *marker, int *closed)
295 {
296   off_t index_len;
297   u_int64_t index;
298
299   __jlog_open_indexer(ctx, log);
300   if (!ctx->index)
301     SYS_FAIL(JLOG_ERR_IDX_OPEN);
302   if ((index_len = jlog_file_size(ctx->index)) == -1)
303     SYS_FAIL(JLOG_ERR_IDX_SEEK);
304   if (index_len % sizeof(u_int64_t))
305     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
306   if (index_len > sizeof(u_int64_t)) {
307     if (!jlog_file_pread(ctx->index, &index, sizeof(u_int64_t),
308                          index_len - sizeof(u_int64_t)))
309     {
310       SYS_FAIL(JLOG_ERR_IDX_READ);
311     }
312     if (index) {
313       *marker = index_len / sizeof(u_int64_t);
314       *closed = 0;
315     } else {
316       *marker = (index_len / sizeof(u_int64_t)) - 1;
317       *closed = 1;
318     }
319   } else {
320     *marker = index_len / sizeof(u_int64_t);
321     *closed = 0;
322   }
323
324   return 0;
325 finish:
326   return -1;
327 }
328
329 static int __jlog_unlink_datafile(jlog_ctx *ctx, u_int32_t log) {
330   char file[MAXPATHLEN];
331   int len;
332
333   if(ctx->current_log == log) {
334     __jlog_close_reader(ctx);
335     __jlog_close_indexer(ctx);
336   }
337
338   STRSETDATAFILE(ctx, file, log);
339 #ifdef DEBUG
340   fprintf(stderr, "unlinking %s\n", file);
341 #endif
342   unlink(file);
343
344   len = strlen(file);
345   if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
346   memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
347 #ifdef DEBUG
348   fprintf(stderr, "unlinking %s\n", file);
349 #endif
350   unlink(file);
351   return 0;
352 }
353
354 static int __jlog_open_metastore(jlog_ctx *ctx)
355 {
356   char file[MAXPATHLEN];
357   int len;
358
359 #ifdef DEBUG
360   fprintf(stderr, "__jlog_open_metastore\n");
361 #endif
362   len = strlen(ctx->path);
363   if((len + 1 /* IFS_CH */ + 9 /* "metastore" */ + 1) > MAXPATHLEN) {
364 #ifdef ENAMETOOLONG
365     ctx->last_errno = ENAMETOOLONG;
366 #endif
367     ctx->last_error = JLOG_ERR_CREATE_META;
368     return -1;
369   }
370   memcpy(file, ctx->path, len);
371   file[len++] = IFS_CH;
372   memcpy(&file[len], "metastore", 10); /* "metastore" + '\0' */
373
374   ctx->metastore = jlog_file_open(file, O_CREAT, ctx->file_mode);
375
376   if (!ctx->metastore) {
377     ctx->last_errno = errno;
378     ctx->last_error = JLOG_ERR_CREATE_META;
379     return -1;
380   }
381
382   return 0;
383 }
384
385 /* exported */
386 int __jlog_pending_readers(jlog_ctx *ctx, u_int32_t log) {
387   int readers;
388   DIR *dir;
389   struct dirent *ent;
390   char file[MAXPATHLEN];
391   int len;
392   jlog_id id;
393
394   readers = 0;
395
396   dir = opendir(ctx->path);
397   if (!dir) return -1;
398  
399   len = strlen(ctx->path);
400   if(len + 2 > sizeof(file)) return -1;
401   memcpy(file, ctx->path, len);
402   file[len++] = IFS_CH;
403   file[len] = '\0';
404
405   while ((ent = readdir(dir))) {
406     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
407       jlog_file *cp;
408       int dlen;
409
410       dlen = strlen(ent->d_name);
411       if((len + dlen + 1) > sizeof(file)) continue;
412       memcpy(file + len, ent->d_name, dlen + 1); /* include \0 */
413 #ifdef DEBUG
414       fprintf(stderr, "Checking if %s needs %s...\n", ent->d_name, ctx->path);
415 #endif
416       if ((cp = jlog_file_open(file, 0, ctx->file_mode))) {
417         if (jlog_file_lock(cp)) {
418           jlog_file_pread(cp, &id, sizeof(id), 0);
419 #ifdef DEBUG
420           fprintf(stderr, "\t%u <= %u (pending reader)\n", id.log, log);
421 #endif
422           if (id.log <= log) {
423             readers++;
424           }
425           jlog_file_unlock(cp);
426         }
427         jlog_file_close(cp);
428       }
429     }
430   }
431   closedir(dir);
432   return readers;
433 }
434 struct _jlog_subs {
435   char **subs;
436   int used;
437   int allocd;
438 };
439
440 int jlog_ctx_list_subscribers_dispose(jlog_ctx *ctx, char **subs) {
441   char *s;
442   int i = 0;
443   if(subs) {
444     while((s = subs[i++]) != NULL) free(s);
445     free(subs);
446   }
447   return 0;
448 }
449
450 int jlog_ctx_list_subscribers(jlog_ctx *ctx, char ***subs) {
451   struct _jlog_subs js = { NULL, 0, 0 };
452   DIR *dir;
453   struct dirent *ent;
454   unsigned char file[MAXPATHLEN];
455   char *p;
456   int len;
457
458   js.subs = calloc(16, sizeof(char *));
459   js.allocd = 16;
460
461   dir = opendir(ctx->path);
462   if (!dir) return -1;
463   while ((ent = readdir(dir))) {
464     if (ent->d_name[0] == 'c' && ent->d_name[1] == 'p' && ent->d_name[2] == '.') {
465
466       for (len = 0, p = ent->d_name + 3; *p;) {
467         unsigned char c;
468         int i;
469
470         for (c = 0, i = 0; i < 16; i++) {
471           if (__jlog_hexchars[i] == *p) {
472             c = i << 4;
473             break;
474           }
475         }
476         p++;
477         for (i = 0; i < 16; i++) {
478           if (__jlog_hexchars[i] == *p) {
479             c |= i;
480             break;
481           }
482         }
483         p++;
484         file[len++] = c;
485       }
486       file[len] = '\0';
487
488       js.subs[js.used++] = strdup((char *)file);
489       if(js.used == js.allocd) {
490         js.allocd *= 2;
491         js.subs = realloc(js.subs, js.allocd*sizeof(char *));
492       }
493       js.subs[js.used] = NULL;
494     }
495   }
496   closedir(dir);
497   *subs = js.subs;
498   return js.used;
499 }
500
501 static int __jlog_save_metastore(jlog_ctx *ctx, int ilocked)
502 {
503   struct _jlog_meta_info info;
504 #ifdef DEBUG
505   fprintf(stderr, "__jlog_save_metastore\n");
506 #endif
507
508   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
509     return -1;
510   }
511
512   info.storage_log = ctx->storage.log;
513   info.unit_limit = ctx->unit_limit;
514   info.safety = ctx->safety;
515
516   if (!jlog_file_pwrite(ctx->metastore, &info, sizeof(info), 0)) {
517     if (!ilocked) jlog_file_unlock(ctx->metastore);
518     return -1;
519   }
520   if (ctx->safety == JLOG_SAFE) {
521     jlog_file_sync(ctx->metastore);
522   }
523
524   if (!ilocked) jlog_file_unlock(ctx->metastore);
525   return 0;
526 }
527
528 static int __jlog_restore_metastore(jlog_ctx *ctx, int ilocked)
529 {
530   struct _jlog_meta_info info;
531 #ifdef DEBUG
532   fprintf(stderr, "__jlog_restore_metastore\n");
533 #endif
534
535   if (!ilocked && !jlog_file_lock(ctx->metastore)) {
536     return -1;
537   }
538
539   if (!jlog_file_pread(ctx->metastore, &info, sizeof(info), 0)) {
540     if (!ilocked) jlog_file_unlock(ctx->metastore);
541     return -1;
542   }
543
544   if (!ilocked) jlog_file_unlock(ctx->metastore);
545
546   ctx->storage.log = info.storage_log;
547   ctx->unit_limit = info.unit_limit;
548   ctx->safety = info.safety;
549
550   return 0;
551 }
552
553 int jlog_get_checkpoint(jlog_ctx *ctx, const char *s, jlog_id *id) {
554   jlog_file *f;
555   int rv = -1;
556
557   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
558     if(!ctx->checkpoint) {
559       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
560     }
561     f = ctx->checkpoint;
562   } else
563     f = __jlog_open_named_checkpoint(ctx, s, 0);
564
565   if (f) {
566     if (jlog_file_lock(f)) {
567       if (jlog_file_pread(f, id, sizeof(*id), 0)) rv = 0;
568       jlog_file_unlock(f);
569     }
570   }
571   if (f && f != ctx->checkpoint) jlog_file_close(f);
572   return rv;
573 }
574
575 static int __jlog_set_checkpoint(jlog_ctx *ctx, const char *s, const jlog_id *id)
576 {
577   jlog_file *f;
578   int rv = -1;
579   jlog_id old_id;
580   u_int32_t log;
581
582   if(ctx->subscriber_name && !strcmp(ctx->subscriber_name, s)) {
583     if(!ctx->checkpoint) {
584       ctx->checkpoint = __jlog_open_named_checkpoint(ctx, s, 0);
585     }
586     f = ctx->checkpoint;
587   } else
588     f = __jlog_open_named_checkpoint(ctx, s, 0);
589
590   if(!f) return -1;
591   if (!jlog_file_lock(f))
592     goto failset;
593
594   if (jlog_file_size(f) == 0) {
595     /* we're setting it for the first time, no segments were pending on it */
596     old_id.log = id->log;
597   } else {
598     if (!jlog_file_pread(f, &old_id, sizeof(old_id), 0))
599       goto failset;
600   }
601   if (!jlog_file_pwrite(f, id, sizeof(*id), 0))
602     goto failset;
603   if (ctx->safety == JLOG_SAFE) {
604     jlog_file_sync(f);
605   }
606   jlog_file_unlock(f);
607   rv = 0;
608
609   for (log = old_id.log; log < id->log; log++) {
610     if (__jlog_pending_readers(ctx, log) == 0) {
611       __jlog_unlink_datafile(ctx, log);
612     }
613   }
614
615  failset:
616   if (f && f != ctx->checkpoint) jlog_file_close(f);
617   return rv;
618 }
619
620 static int __jlog_close_metastore(jlog_ctx *ctx) {
621   if (ctx->metastore) {
622     jlog_file_close(ctx->metastore);
623     ctx->metastore = NULL;
624   }
625   return 0;
626 }
627
628 /* path is assumed to be MAXPATHLEN */
629 static char *compute_checkpoint_filename(jlog_ctx *ctx, const char *subscriber, char *name)
630 {
631   const char *sub;
632   int len;
633
634   /* build checkpoint filename */
635   len = strlen(ctx->path);
636   memcpy(name, ctx->path, len);
637   name[len++] = IFS_CH;
638   name[len++] = 'c';
639   name[len++] = 'p';
640   name[len++] = '.';
641   for (sub = subscriber; *sub; ) {
642     name[len++] = __jlog_hexchars[((*sub & 0xf0) >> 4)];
643     name[len++] = __jlog_hexchars[(*sub & 0x0f)];
644     sub++;
645   }
646   name[len] = '\0';
647
648 #ifdef DEBUG
649   fprintf(stderr, "checkpoint %s filename is %s\n", subscriber, name);
650 #endif
651   return name;
652 }
653
654 static jlog_file *__jlog_open_named_checkpoint(jlog_ctx *ctx, const char *cpname, int flags)
655 {
656   char name[MAXPATHLEN];
657   compute_checkpoint_filename(ctx, cpname, name);
658   return jlog_file_open(name, flags, ctx->file_mode);
659 }
660
661 static jlog_file *__jlog_open_reader(jlog_ctx *ctx, u_int32_t log) {
662   char file[MAXPATHLEN];
663
664   if(ctx->current_log != log) {
665     __jlog_close_reader(ctx);
666     __jlog_close_indexer(ctx);
667   }
668   if(ctx->data) {
669     return ctx->data;
670   }
671   STRSETDATAFILE(ctx, file, log);
672 #ifdef DEBUG
673   fprintf(stderr, "opening log file[ro]: '%s'\n", file);
674 #endif
675   ctx->data = jlog_file_open(file, 0, ctx->file_mode);
676   ctx->current_log = log;
677   return ctx->data;
678 }
679
680 static int __jlog_munmap_reader(jlog_ctx *ctx) {
681   if(ctx->mmap_base) {
682     munmap(ctx->mmap_base, ctx->mmap_len);
683     ctx->mmap_base = NULL;
684     ctx->mmap_len = 0;
685   }
686   return 0;
687 }
688
689 static int __jlog_mmap_reader(jlog_ctx *ctx, u_int32_t log) {
690   if(ctx->current_log == log && ctx->mmap_base) return 0;
691   __jlog_open_reader(ctx, log);
692   if(!ctx->data)
693     return -1;
694   if (!jlog_file_map_read(ctx->data, &ctx->mmap_base, &ctx->mmap_len)) {
695     ctx->mmap_base = NULL;
696     ctx->last_error = JLOG_ERR_FILE_READ;
697     ctx->last_errno = errno;
698     return -1;
699   }
700   return 0;
701 }
702
703 static jlog_file *__jlog_open_writer(jlog_ctx *ctx) {
704   char file[MAXPATHLEN];
705
706   if(ctx->data) {
707     /* Still open */
708     return ctx->data;
709   }
710
711   if(!jlog_file_lock(ctx->metastore))
712     SYS_FAIL(JLOG_ERR_LOCK);
713   if(__jlog_restore_metastore(ctx, 1))
714     SYS_FAIL(JLOG_ERR_META_OPEN);
715   STRSETDATAFILE(ctx, file, ctx->storage.log);
716 #ifdef DEBUG
717   fprintf(stderr, "opening log file[rw]: '%s'\n", file);
718 #endif
719   ctx->data = jlog_file_open(file, O_CREAT, ctx->file_mode);
720  finish:
721   jlog_file_unlock(ctx->metastore);
722   return ctx->data;
723 }
724
725 static int __jlog_close_writer(jlog_ctx *ctx) {
726   if (ctx->data) {
727     jlog_file_close(ctx->data);
728     ctx->data = NULL;
729   }
730   return 0;
731 }
732
733 static int __jlog_close_reader(jlog_ctx *ctx) {
734   __jlog_munmap_reader(ctx);
735   if (ctx->data) {
736     jlog_file_close(ctx->data);
737     ctx->data = NULL;
738   }
739   return 0;
740 }
741
742 static int __jlog_close_checkpoint(jlog_ctx *ctx) {
743   if (ctx->checkpoint) {
744     jlog_file_close(ctx->checkpoint);
745     ctx->checkpoint = NULL;
746   }
747   return 0;
748 }
749
750 static jlog_file *__jlog_open_indexer(jlog_ctx *ctx, u_int32_t log) {
751   char file[MAXPATHLEN];
752   int len;
753
754   if(ctx->current_log != log) {
755     __jlog_close_reader(ctx);
756     __jlog_close_indexer(ctx);
757   }
758   if(ctx->index) {
759     return ctx->index;
760   }
761   STRSETDATAFILE(ctx, file, log);
762
763   len = strlen(file);
764   if((len + sizeof(INDEX_EXT)) > sizeof(file)) return NULL;
765   memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
766 #ifdef DEBUG
767   fprintf(stderr, "opening index file: '%s'\n", idx);
768 #endif
769   ctx->index = jlog_file_open(file, O_CREAT, ctx->file_mode);
770   ctx->current_log = log;
771   return ctx->index;
772 }
773
774 static int __jlog_close_indexer(jlog_ctx *ctx) {
775   if (ctx->index) {
776     jlog_file_close(ctx->index);
777     ctx->index = NULL;
778   }
779   return 0;
780 }
781
782 static int
783 ___jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last,
784                      int *closed) {
785   jlog_message_header logmhdr;
786   int i, second_try = 0;
787   off_t index_off, data_off, data_len;
788   u_int64_t index;
789   u_int64_t indices[BUFFERED_INDICES];
790
791   ctx->last_error = JLOG_ERR_SUCCESS;
792   if(closed) *closed = 0;
793
794   __jlog_open_reader(ctx, log);
795   if (!ctx->data) {
796     ctx->last_error = JLOG_ERR_FILE_OPEN;
797     ctx->last_errno = errno;
798     return -1;
799   }
800
801 #define RESTART do { \
802   if (second_try == 0) { \
803     jlog_file_truncate(ctx->index, 0); \
804     jlog_file_unlock(ctx->index); \
805     second_try = 1; \
806     ctx->last_error = JLOG_ERR_SUCCESS; \
807     goto restart; \
808   } \
809   SYS_FAIL(JLOG_ERR_IDX_CORRUPT); \
810 } while (0)
811
812 restart:
813   __jlog_open_indexer(ctx, log);
814   if (!ctx->index) {
815     ctx->last_error = JLOG_ERR_IDX_OPEN;
816     ctx->last_errno = errno;
817     return -1;
818   }
819   if (!jlog_file_lock(ctx->index)) {
820     ctx->last_error = JLOG_ERR_LOCK;
821     ctx->last_errno = errno;
822     return -1;
823   }
824
825   data_off = 0;
826   if ((data_len = jlog_file_size(ctx->data)) == -1)
827     SYS_FAIL(JLOG_ERR_FILE_SEEK);
828   if ((index_off = jlog_file_size(ctx->index)) == -1)
829     SYS_FAIL(JLOG_ERR_IDX_SEEK);
830
831   if (index_off % sizeof(u_int64_t)) {
832 #ifdef DEBUG
833     fprintf(stderr, "corrupt index [%llu]\n", index_off);
834 #endif
835     RESTART;
836   }
837
838   if (index_off > sizeof(u_int64_t)) {
839     if (!jlog_file_pread(ctx->index, &index, sizeof(index),
840                          index_off - sizeof(u_int64_t)))
841     {
842       SYS_FAIL(JLOG_ERR_IDX_READ);
843     }
844     if (index == 0) {
845       /* This log file has been "closed" */
846 #ifdef DEBUG
847       fprintf(stderr, "index closed\n");
848 #endif
849       if(last) {
850         last->log = log;
851         last->marker = (index_off / sizeof(u_int64_t)) - 1;
852       }
853       if(closed) *closed = 1;
854       goto finish;
855     } else {
856       if (index > data_len) {
857 #ifdef DEBUG
858         fprintf(stderr, "index told me to seek somehwere I can't\n");
859 #endif
860         RESTART;
861       }
862       data_off = index;
863     }
864   }
865
866   if (index_off > 0) {
867     /* We are adding onto a partial index so we must advance a record */
868     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
869       SYS_FAIL(JLOG_ERR_FILE_READ);
870     if ((data_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
871       RESTART;
872   }
873
874   i = 0;
875   while (data_off + sizeof(logmhdr) <= data_len) {
876     off_t next_off = data_off;
877
878     if (!jlog_file_pread(ctx->data, &logmhdr, sizeof(logmhdr), data_off))
879       SYS_FAIL(JLOG_ERR_FILE_READ);
880     if (logmhdr.reserved != 0)
881       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
882     if ((next_off += sizeof(logmhdr) + logmhdr.mlen) > data_len)
883       break;
884
885     /* Write our new index offset */
886     indices[i++] = data_off;
887     if(i >= BUFFERED_INDICES) {
888 #ifdef DEBUG
889       fprintf(stderr, "writing %i offsets\n", i);
890 #endif
891       if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
892         RESTART;
893       index_off += i * sizeof(u_int64_t);
894       i = 0;
895     }
896     data_off = next_off;
897   }
898   if(i > 0) {
899 #ifdef DEBUG
900     fprintf(stderr, "writing %i offsets\n", i);
901 #endif
902     if (!jlog_file_pwrite(ctx->index, indices, i * sizeof(u_int64_t), index_off))
903       RESTART;
904     index_off += i * sizeof(u_int64_t);
905   }
906   if(last) {
907     last->log = log;
908     last->marker = index_off / sizeof(u_int64_t);
909   }
910   if(log < ctx->storage.log) {
911     if (data_off != data_len) {
912 #ifdef DEBUG
913       fprintf(stderr, "closing index, but %llu != %llu\n", data_off, data_len);
914 #endif
915       SYS_FAIL(JLOG_ERR_FILE_CORRUPT);
916     }
917     /* Special case: if we are closing, we next write a '0'
918      * we can't write the closing marker if the data segment had no records
919      * in it, since it will be confused with an index to offset 0 by the
920      * next reader; this only happens when segments are repaired */
921     if (index_off) {
922       index = 0;
923       if (!jlog_file_pwrite(ctx->index, &index, sizeof(u_int64_t), index_off))
924         RESTART;
925     }
926     if(closed) *closed = 1;
927   }
928 #undef RESTART
929
930 finish:
931   jlog_file_unlock(ctx->index);
932 #ifdef DEBUG
933   fprintf(stderr, "index is %s\n", closed?(*closed?"closed":"open"):"unknown");
934 #endif
935   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
936   return -1;
937 }
938
939 static int __jlog_resync_index(jlog_ctx *ctx, u_int32_t log, jlog_id *last, int *closed) {
940   int attempts, rv = -1;
941   for(attempts=0; attempts<4; attempts++) {
942     rv = ___jlog_resync_index(ctx, log, last, closed);
943     if(ctx->last_error == JLOG_ERR_SUCCESS) break;
944     if(ctx->last_error == JLOG_ERR_FILE_OPEN ||
945        ctx->last_error == JLOG_ERR_IDX_OPEN) break;
946
947     /* We can't fix the file if someone may write to it again */
948     if(log >= ctx->storage.log) break;
949
950     jlog_file_lock(ctx->index);
951     /* it doesn't really matter what jlog_repair_datafile returns
952      * we'll keep retrying anyway */
953     jlog_repair_datafile(ctx, log);
954     jlog_file_truncate(ctx->index, 0);
955     jlog_file_unlock(ctx->index);
956   }
957   return rv;
958 }
959
960 jlog_ctx *jlog_new(const char *path) {
961   jlog_ctx *ctx;
962   ctx = calloc(1, sizeof(*ctx));
963   ctx->unit_limit = DEFAULT_UNIT_LIMIT;
964   ctx->file_mode = DEFAULT_FILE_MODE;
965   ctx->safety = DEFAULT_SAFETY;
966   ctx->context_mode = JLOG_NEW;
967   ctx->path = strdup(path);
968   return ctx;
969 }
970
971 void jlog_set_error_func(jlog_ctx *ctx, jlog_error_func Func, void *ptr) {
972   ctx->error_func = Func;
973   ctx->error_ctx = ptr;
974 }
975
976 size_t jlog_raw_size(jlog_ctx *ctx) {
977   DIR *d;
978   struct dirent *de;
979   size_t totalsize = 0;
980   int ferr, len;
981   char filename[MAXPATHLEN];
982
983   d = opendir(ctx->path);
984   if(!d) return 0;
985   len = strlen(ctx->path);
986   memcpy(filename, ctx->path, len);
987   filename[len++] = IFS_CH;
988   while((de = readdir(d)) != NULL) {
989     struct stat sb;
990     int dlen;
991
992     dlen = strlen(de->d_name);
993     if((len + dlen + 1) > sizeof(filename)) continue;
994     memcpy(filename+len, de->d_name, dlen + 1); /* include \0 */
995     while((ferr = stat(filename, &sb)) == -1 && errno == EINTR);
996     if(ferr == 0 && S_ISREG(sb.st_mode)) totalsize += sb.st_size;
997   }
998   closedir(d);
999   return totalsize;
1000 }
1001
1002 const char *jlog_ctx_err_string(jlog_ctx *ctx) {
1003   switch (ctx->last_error) {
1004 #define MSG_O_MATIC(x)  case x: return #x;
1005     MSG_O_MATIC( JLOG_ERR_SUCCESS);
1006     MSG_O_MATIC( JLOG_ERR_ILLEGAL_INIT);
1007     MSG_O_MATIC( JLOG_ERR_ILLEGAL_OPEN);
1008     MSG_O_MATIC( JLOG_ERR_OPEN);
1009     MSG_O_MATIC( JLOG_ERR_NOTDIR);
1010     MSG_O_MATIC( JLOG_ERR_CREATE_PATHLEN);
1011     MSG_O_MATIC( JLOG_ERR_CREATE_EXISTS);
1012     MSG_O_MATIC( JLOG_ERR_CREATE_MKDIR);
1013     MSG_O_MATIC( JLOG_ERR_CREATE_META);
1014     MSG_O_MATIC( JLOG_ERR_LOCK);
1015     MSG_O_MATIC( JLOG_ERR_IDX_OPEN);
1016     MSG_O_MATIC( JLOG_ERR_IDX_SEEK);
1017     MSG_O_MATIC( JLOG_ERR_IDX_CORRUPT);
1018     MSG_O_MATIC( JLOG_ERR_IDX_WRITE);
1019     MSG_O_MATIC( JLOG_ERR_IDX_READ);
1020     MSG_O_MATIC( JLOG_ERR_FILE_OPEN);
1021     MSG_O_MATIC( JLOG_ERR_FILE_SEEK);
1022     MSG_O_MATIC( JLOG_ERR_FILE_CORRUPT);
1023     MSG_O_MATIC( JLOG_ERR_FILE_READ);
1024     MSG_O_MATIC( JLOG_ERR_FILE_WRITE);
1025     MSG_O_MATIC( JLOG_ERR_META_OPEN);
1026     MSG_O_MATIC( JLOG_ERR_ILLEGAL_WRITE);
1027     MSG_O_MATIC( JLOG_ERR_ILLEGAL_CHECKPOINT);
1028     MSG_O_MATIC( JLOG_ERR_INVALID_SUBSCRIBER);
1029     MSG_O_MATIC( JLOG_ERR_ILLEGAL_LOGID);
1030     MSG_O_MATIC( JLOG_ERR_SUBSCRIBER_EXISTS);
1031     MSG_O_MATIC( JLOG_ERR_CHECKPOINT);
1032     MSG_O_MATIC( JLOG_ERR_NOT_SUPPORTED);
1033     default: return "Unknown";
1034   }
1035 }
1036
1037 int jlog_ctx_err(jlog_ctx *ctx) {
1038   return ctx->last_error;
1039 }
1040
1041 int jlog_ctx_errno(jlog_ctx *ctx) {
1042   return ctx->last_errno;
1043 }
1044
1045 int jlog_ctx_alter_safety(jlog_ctx *ctx, jlog_safety safety) {
1046   if(ctx->context_mode != JLOG_NEW) return -1;
1047   ctx->safety = safety;
1048   return 0;
1049 }
1050 int jlog_ctx_alter_journal_size(jlog_ctx *ctx, size_t size) {
1051   if(ctx->context_mode != JLOG_NEW) return -1;
1052   ctx->unit_limit = size;
1053   return 0;
1054 }
1055 int jlog_ctx_alter_mode(jlog_ctx *ctx, int mode) {
1056   ctx->file_mode = mode;
1057   return 0;
1058 }
1059 int jlog_ctx_open_writer(jlog_ctx *ctx) {
1060   int rv;
1061   struct stat sb;
1062
1063   ctx->last_error = JLOG_ERR_SUCCESS;
1064   if(ctx->context_mode != JLOG_NEW) {
1065     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1066     return -1;
1067   }
1068   ctx->context_mode = JLOG_APPEND;
1069   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1070   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1071   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1072   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1073   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1074  finish:
1075   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1076   ctx->context_mode = JLOG_INVALID;
1077   return -1;
1078 }
1079 int jlog_ctx_open_reader(jlog_ctx *ctx, const char *subscriber) {
1080   int rv;
1081   struct stat sb;
1082   jlog_id dummy;
1083
1084   ctx->last_error = JLOG_ERR_SUCCESS;
1085   if(ctx->context_mode != JLOG_NEW) {
1086     ctx->last_error = JLOG_ERR_ILLEGAL_OPEN;
1087     return -1;
1088   }
1089   ctx->context_mode = JLOG_READ;
1090   ctx->subscriber_name = strdup(subscriber);
1091   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1092   if(rv == -1) SYS_FAIL(JLOG_ERR_OPEN);
1093   if(!S_ISDIR(sb.st_mode)) SYS_FAIL(JLOG_ERR_NOTDIR);
1094   if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1095   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &dummy))
1096     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1097   if(__jlog_restore_metastore(ctx, 0)) SYS_FAIL(JLOG_ERR_META_OPEN);
1098  finish:
1099   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1100   ctx->context_mode = JLOG_INVALID;
1101   return -1;
1102 }
1103 int jlog_ctx_init(jlog_ctx *ctx) {
1104   int rv;
1105   struct stat sb;
1106   int dirmode;
1107
1108   ctx->last_error = JLOG_ERR_SUCCESS;
1109   if(strlen(ctx->path) > MAXLOGPATHLEN-1) {
1110     ctx->last_error = JLOG_ERR_CREATE_PATHLEN;
1111     return -1;
1112   }
1113   if(ctx->context_mode != JLOG_NEW) {
1114     ctx->last_error = JLOG_ERR_ILLEGAL_INIT;
1115     return -1;
1116   }
1117   ctx->context_mode = JLOG_INIT;
1118   while((rv = stat(ctx->path, &sb)) == -1 && errno == EINTR);
1119   if(rv == 0 || errno != ENOENT) {
1120     SYS_FAIL_EX(JLOG_ERR_CREATE_EXISTS, 0);
1121   }
1122   dirmode = ctx->file_mode;
1123   if(dirmode & 0400) dirmode |= 0100;
1124   if(dirmode & 040) dirmode |= 010;
1125   if(dirmode & 04) dirmode |= 01;
1126   if(mkdir(ctx->path, dirmode) == -1)
1127     SYS_FAIL(JLOG_ERR_CREATE_MKDIR);
1128   chmod(ctx->path, dirmode);
1129   /* Setup our initial state and store our instance metadata */
1130   if(__jlog_open_metastore(ctx) != 0)
1131     SYS_FAIL(JLOG_ERR_CREATE_META);
1132   if(__jlog_save_metastore(ctx, 0) != 0)
1133     SYS_FAIL(JLOG_ERR_CREATE_META);
1134  finish:
1135   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1136   return -1;
1137 }
1138 int jlog_ctx_close(jlog_ctx *ctx) {
1139   __jlog_close_indexer(ctx);
1140   __jlog_close_reader(ctx);
1141   __jlog_close_metastore(ctx);
1142   __jlog_close_checkpoint(ctx);
1143   if(ctx->subscriber_name) free(ctx->subscriber_name);
1144   if(ctx->path) free(ctx->path);
1145   free(ctx);
1146   return 0;
1147 }
1148
1149 static int __jlog_metastore_atomic_increment(jlog_ctx *ctx) {
1150   u_int32_t saved_storage_log = ctx->storage.log;
1151 #ifdef DEBUG
1152   fprintf(stderr, "atomic increment on %u\n", saved_storage_log);
1153 #endif
1154   if (!jlog_file_lock(ctx->metastore))
1155     SYS_FAIL(JLOG_ERR_LOCK);
1156   if(__jlog_restore_metastore(ctx, 1))
1157     SYS_FAIL(JLOG_ERR_META_OPEN);
1158   if(ctx->storage.log == saved_storage_log) {
1159     /* We're the first ones to it, so we get to increment it */
1160     ctx->storage.log++;
1161     if(__jlog_save_metastore(ctx, 1))
1162       SYS_FAIL(JLOG_ERR_META_OPEN);
1163   }
1164  finish:
1165   jlog_file_unlock(ctx->metastore);
1166   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1167   return -1;
1168 }
1169 int jlog_ctx_write_message(jlog_ctx *ctx, jlog_message *mess, struct timeval *when) {
1170   struct timeval now;
1171   jlog_message_header hdr;
1172   off_t current_offset;
1173
1174   ctx->last_error = JLOG_ERR_SUCCESS;
1175   if(ctx->context_mode != JLOG_APPEND) {
1176     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1177     ctx->last_errno = EPERM;
1178     return -1;
1179   }
1180  begin:
1181   __jlog_open_writer(ctx);
1182   if(!ctx->data) {
1183     ctx->last_error = JLOG_ERR_FILE_OPEN;
1184     ctx->last_errno = errno;
1185     return -1;
1186   }
1187   if (!jlog_file_lock(ctx->data)) {
1188     ctx->last_error = JLOG_ERR_LOCK;
1189     ctx->last_errno = errno;
1190     return -1;
1191   }
1192
1193   if ((current_offset = jlog_file_size(ctx->data)) == -1)
1194     SYS_FAIL(JLOG_ERR_FILE_SEEK);
1195   if(ctx->unit_limit <= current_offset) {
1196     jlog_file_unlock(ctx->data);
1197     __jlog_close_writer(ctx);
1198     __jlog_metastore_atomic_increment(ctx);
1199     goto begin;
1200   }
1201
1202   hdr.reserved = 0;
1203   if (when) {
1204     hdr.tv_sec = when->tv_sec;
1205     hdr.tv_usec = when->tv_usec;
1206   } else {
1207     gettimeofday(&now, NULL);
1208     hdr.tv_sec = now.tv_sec;
1209     hdr.tv_usec = now.tv_usec;
1210   }
1211   hdr.mlen = mess->mess_len;
1212   if (!jlog_file_pwrite(ctx->data, &hdr, sizeof(hdr), current_offset))
1213     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1214   current_offset += sizeof(hdr);
1215   if (!jlog_file_pwrite(ctx->data, mess->mess, mess->mess_len, current_offset))
1216     SYS_FAIL(JLOG_ERR_FILE_WRITE);
1217   current_offset += mess->mess_len;
1218
1219   if(ctx->unit_limit <= current_offset) {
1220     jlog_file_unlock(ctx->data);
1221     __jlog_close_writer(ctx);
1222     __jlog_metastore_atomic_increment(ctx);
1223     return 0;
1224   }
1225  finish:
1226   jlog_file_unlock(ctx->data);
1227   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1228   return -1;
1229 }
1230 int jlog_ctx_read_checkpoint(jlog_ctx *ctx, const jlog_id *chkpt) {
1231   ctx->last_error = JLOG_ERR_SUCCESS;
1232  
1233   if(ctx->context_mode != JLOG_READ) {
1234     ctx->last_error = JLOG_ERR_ILLEGAL_CHECKPOINT;
1235     ctx->last_errno = EPERM;
1236     return -1;
1237   }
1238   if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, chkpt) != 0) {
1239     ctx->last_error = JLOG_ERR_CHECKPOINT;
1240     ctx->last_errno = 0;
1241     return -1;
1242   }
1243   return 0;
1244 }
1245
1246 int jlog_ctx_remove_subscriber(jlog_ctx *ctx, const char *s) {
1247   char name[MAXPATHLEN];
1248   int rv;
1249
1250   compute_checkpoint_filename(ctx, s, name);
1251   rv = unlink(name);
1252
1253   if (rv == 0) {
1254     ctx->last_error = JLOG_ERR_SUCCESS;
1255     return 1;
1256   }
1257   if (errno == ENOENT) {
1258     ctx->last_error = JLOG_ERR_INVALID_SUBSCRIBER;
1259     return 0;
1260   }
1261   return -1;
1262 }
1263
1264 int jlog_ctx_add_subscriber(jlog_ctx *ctx, const char *s, jlog_position whence) {
1265   jlog_id chkpt;
1266   jlog_ctx *tmpctx = NULL;
1267   jlog_file *jchkpt;
1268   ctx->last_error = JLOG_ERR_SUCCESS;
1269
1270   jchkpt = __jlog_open_named_checkpoint(ctx, s, O_CREAT|O_EXCL);
1271   if(!jchkpt) {
1272     ctx->last_error = JLOG_ERR_SUBSCRIBER_EXISTS;
1273     ctx->last_errno = EEXIST;
1274     return -1;
1275   }
1276   jlog_file_close(jchkpt);
1277  
1278   if(whence == JLOG_BEGIN) {
1279     memset(&chkpt, 0, sizeof(chkpt));
1280     jlog_ctx_first_log_id(ctx, &chkpt);
1281     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0) {
1282       ctx->last_error = JLOG_ERR_CHECKPOINT;
1283       ctx->last_errno = 0;
1284       return -1;
1285     }
1286     return 0;
1287   }
1288   if(whence == JLOG_END) {
1289     jlog_id start, finish;
1290     memset(&chkpt, 0, sizeof(chkpt));
1291     if(__jlog_open_metastore(ctx) != 0) SYS_FAIL(JLOG_ERR_META_OPEN);
1292     if(__jlog_restore_metastore(ctx, 0))
1293       SYS_FAIL(JLOG_ERR_META_OPEN);
1294     chkpt.log = ctx->storage.log;
1295     if(__jlog_set_checkpoint(ctx, s, &chkpt) != 0)
1296       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1297     tmpctx = jlog_new(ctx->path);
1298     if(jlog_ctx_open_reader(tmpctx, s) < 0) goto finish;
1299     if(jlog_ctx_read_interval(tmpctx, &start, &finish) < 0) goto finish;
1300     jlog_ctx_close(tmpctx);
1301     tmpctx = NULL;
1302     if(__jlog_set_checkpoint(ctx, s, &finish) != 0)
1303       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1304     return 0;
1305   }
1306   ctx->last_error = JLOG_ERR_NOT_SUPPORTED;
1307  finish:
1308   if(tmpctx) jlog_ctx_close(tmpctx);
1309   return -1;
1310 }
1311
1312 int jlog_ctx_write(jlog_ctx *ctx, const void *data, size_t len) {
1313   jlog_message m;
1314   m.mess = (void *)data;
1315   m.mess_len = len;
1316   return jlog_ctx_write_message(ctx, &m, NULL);
1317 }
1318
1319 static int __jlog_find_first_log_after(jlog_ctx *ctx, jlog_id *chkpt,
1320                                 jlog_id *start, jlog_id *finish) {
1321   jlog_id last;
1322   int closed;
1323  
1324   memcpy(start, chkpt, sizeof(*chkpt));
1325  attempt:
1326   if(__jlog_resync_index(ctx, start->log, &last, &closed) != 0) {
1327     if(ctx->last_error == JLOG_ERR_FILE_OPEN &&
1328         ctx->last_errno == ENOENT) {
1329       char file[MAXPATHLEN];
1330       int ferr, len;
1331       struct stat sb = {0};
1332
1333       STRSETDATAFILE(ctx, file, start->log + 1);
1334       while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1335       /* That file doesn't exist... bad, but we can fake a recovery by
1336          advancing the next file that does exist */
1337       ctx->last_error = JLOG_ERR_SUCCESS;
1338       if(start->log >= ctx->storage.log || ferr != 0 || sb.st_size == 0) {
1339         /* We don't advance past where people are writing */
1340         memcpy(finish, start, sizeof(*start));
1341         return 0;
1342       }
1343       if(__jlog_resync_index(ctx, start->log + 1, &last, &closed) != 0) {
1344         /* We don't advance past where people are writing */
1345         memcpy(finish, start, sizeof(*start));
1346         return 0;
1347       }
1348       len = strlen(file);
1349       if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
1350       memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
1351       while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1352       if(ferr != 0 || sb.st_size == 0) {
1353         /* We don't advance past where people are writing */
1354         memcpy(finish, start, sizeof(*start));
1355         return 0;
1356       }
1357       start->marker = 0;
1358       start->log++;  /* BE SMARTER! */
1359       goto attempt;
1360     }
1361     return -1; /* Just persist resync's error state */
1362   }
1363
1364   /* If someone checkpoints off the end, be nice */
1365   if(last.log == start->log && last.marker < start->marker)
1366     memcpy(start, &last, sizeof(*start));
1367
1368   if(!memcmp(start, &last, sizeof(last)) && closed) {
1369     char file[MAXPATHLEN];
1370     int ferr, len;
1371     struct stat sb = {0};
1372
1373     STRSETDATAFILE(ctx, file, start->log + 1);
1374     while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1375     if(start->log >= ctx->storage.log || ferr != 0 || sb.st_size == 0) {
1376       /* We don't advance past where people are writing */
1377       memcpy(finish, start, sizeof(*start));
1378       return 0;
1379     }
1380     if(__jlog_resync_index(ctx, start->log + 1, &last, &closed) != 0) {
1381       /* We don't advance past where people are writing */
1382       memcpy(finish, start, sizeof(*start));
1383       return 0;
1384     }
1385     len = strlen(file);
1386     if((len + sizeof(INDEX_EXT)) > sizeof(file)) return -1;
1387     memcpy(file + len, INDEX_EXT, sizeof(INDEX_EXT));
1388     while((ferr = stat(file, &sb)) == -1 && errno == EINTR);
1389     if(ferr != 0 || sb.st_size == 0) {
1390       /* We don't advance past where people are writing */
1391       memcpy(finish, start, sizeof(*start));
1392       return 0;
1393     }
1394     start->marker = 0;
1395     start->log++;
1396     goto attempt;
1397   }
1398   memcpy(finish, &last, sizeof(last));
1399   return 0;
1400 }
1401 int jlog_ctx_read_message(jlog_ctx *ctx, const jlog_id *id, jlog_message *m) {
1402   off_t index_len;
1403   u_int64_t data_off;
1404
1405   ctx->last_error = JLOG_ERR_SUCCESS;
1406   if (ctx->context_mode != JLOG_READ)
1407     SYS_FAIL(JLOG_ERR_ILLEGAL_WRITE);
1408   if (id->marker < 1) {
1409     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1410   }
1411
1412   __jlog_open_reader(ctx, id->log);
1413   if(!ctx->data)
1414     SYS_FAIL(JLOG_ERR_FILE_OPEN);
1415   __jlog_open_indexer(ctx, id->log);
1416   if(!ctx->index)
1417     SYS_FAIL(JLOG_ERR_IDX_OPEN);
1418
1419   if ((index_len = jlog_file_size(ctx->index)) == -1)
1420     SYS_FAIL(JLOG_ERR_IDX_SEEK);
1421   if (index_len % sizeof(u_int64_t))
1422     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1423   if (id->marker * sizeof(u_int64_t) > index_len) {
1424     SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1425   }
1426
1427   if (!jlog_file_pread(ctx->index, &data_off, sizeof(u_int64_t),
1428                        (id->marker - 1) * sizeof(u_int64_t)))
1429   {
1430     SYS_FAIL(JLOG_ERR_IDX_READ);
1431   }
1432   if (data_off == 0 && id->marker != 1) {
1433     if (id->marker * sizeof(u_int64_t) == index_len) {
1434       /* close tag; not a real offset */
1435       SYS_FAIL(JLOG_ERR_ILLEGAL_LOGID);
1436     } else {
1437       /* an offset of 0 in the middle of an index means curruption */
1438       SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1439     }
1440   }
1441
1442   if(__jlog_mmap_reader(ctx, id->log) != 0)
1443     SYS_FAIL(JLOG_ERR_FILE_READ);
1444
1445   if(data_off > ctx->mmap_len - sizeof(jlog_message_header)) {
1446 #ifdef DEBUG
1447     fprintf(stderr, "read idx off end: %llu\n", data_off);
1448 #endif
1449     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1450   }
1451
1452   memcpy(&m->aligned_header, ((u_int8_t *)ctx->mmap_base) + data_off,
1453          sizeof(jlog_message_header));
1454
1455   if(data_off + sizeof(jlog_message_header) + m->aligned_header.mlen > ctx->mmap_len) {
1456 #ifdef DEBUG
1457     fprintf(stderr, "read idx off end: %llu %llu\n", data_off, ctx->mmap_len);
1458 #endif
1459     SYS_FAIL(JLOG_ERR_IDX_CORRUPT);
1460   }
1461
1462   m->header = &m->aligned_header;
1463   m->mess_len = m->header->mlen;
1464   m->mess = (((u_int8_t *)ctx->mmap_base) + data_off + sizeof(jlog_message_header));
1465
1466  finish:
1467   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1468   if (ctx->last_error == JLOG_ERR_IDX_CORRUPT) {
1469     if (jlog_file_lock(ctx->index)) {
1470       jlog_file_truncate(ctx->index, 0);
1471       jlog_file_unlock(ctx->index);
1472     }
1473   }
1474   return -1;
1475 }
1476 int jlog_ctx_read_interval(jlog_ctx *ctx, jlog_id *start, jlog_id *finish) {
1477   jlog_id chkpt;
1478   int count = 0;
1479
1480   ctx->last_error = JLOG_ERR_SUCCESS;
1481   if(ctx->context_mode != JLOG_READ) {
1482     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1483     ctx->last_errno = EPERM;
1484     return -1;
1485   }
1486
1487   __jlog_restore_metastore(ctx, 0);
1488   if(jlog_get_checkpoint(ctx, ctx->subscriber_name, &chkpt))
1489     SYS_FAIL(JLOG_ERR_INVALID_SUBSCRIBER);
1490   if(__jlog_find_first_log_after(ctx, &chkpt, start, finish) != 0)
1491     goto finish; /* Leave whatever error was set in find_first_log_after */
1492   if(start->log != chkpt.log) start->marker = 0;
1493   else start->marker = chkpt.marker;
1494   if(start->log != chkpt.log) {
1495     /* We've advanced our checkpoint, let's not do this work again */
1496     if(__jlog_set_checkpoint(ctx, ctx->subscriber_name, start) != 0)
1497       SYS_FAIL(JLOG_ERR_CHECKPOINT);
1498   }
1499   /* Here 'start' is actually the checkpoint, so we must advance it one.
1500      However, that may not be possible, if there are no messages, so first
1501      make sure finish is bigger */
1502   count = finish->marker - start->marker;
1503   if(finish->marker > start->marker) start->marker++;
1504
1505   /* We need to munmap it, so that we can remap it with more data if needed */
1506   __jlog_munmap_reader(ctx);
1507  finish:
1508   if(ctx->last_error == JLOG_ERR_SUCCESS) return count;
1509   return -1;
1510 }
1511
1512 int jlog_ctx_first_log_id(jlog_ctx *ctx, jlog_id *id) {
1513   DIR *d;
1514   struct dirent *de;
1515   ctx->last_error = JLOG_ERR_SUCCESS;
1516   u_int32_t log;
1517   int found = 0;
1518
1519   id->log = 0xffffffff;
1520   id->marker = 0;
1521   d = opendir(ctx->path);
1522   if (!d) return -1;
1523
1524   while ((de = readdir(d))) {
1525     int i;
1526     char *cp = de->d_name;
1527     if(strlen(cp) != 8) continue;
1528     log = 0;
1529     for(i=0;i<8;i++) {
1530       log <<= 4;
1531       if(cp[i] >= '0' && cp[i] <= '9') log |= (cp[i] - '0');
1532       else if(cp[i] >= 'a' && cp[i] <= 'f') log |= (cp[i] - 'a' + 0xa);
1533       else if(cp[i] >= 'A' && cp[i] <= 'F') log |= (cp[i] - 'A' + 0xa);
1534       else break;
1535     }
1536     if(i != 8) continue;
1537     found = 1;
1538     if(log < id->log) id->log = log;
1539   }
1540   if(!found) id->log = 0;
1541   closedir(d);
1542   return 0;
1543 }
1544
1545 int jlog_ctx_last_log_id(jlog_ctx *ctx, jlog_id *id) {
1546   ctx->last_error = JLOG_ERR_SUCCESS;
1547   if(ctx->context_mode != JLOG_READ) {
1548     ctx->last_error = JLOG_ERR_ILLEGAL_WRITE;
1549     ctx->last_errno = EPERM;
1550     return -1;
1551   }
1552   if (__jlog_restore_metastore(ctx, 0) != 0) return -1;
1553   ___jlog_resync_index(ctx, ctx->storage.log, id, NULL);
1554   if(ctx->last_error == JLOG_ERR_SUCCESS) return 0;
1555   return -1;
1556 }
1557
1558 int jlog_ctx_advance_id(jlog_ctx *ctx, jlog_id *cur,
1559                         jlog_id *start, jlog_id *finish)
1560 {
1561   int rv;
1562   if(memcmp(cur, finish, sizeof(jlog_id))) {
1563     start->marker++;
1564   } else {
1565     if((rv = __jlog_find_first_log_after(ctx, cur, start, finish)) != 0) {
1566       return rv;
1567     }
1568     if(cur->log != start->log) {
1569       start->marker = 1;
1570     }
1571     else start->marker = cur->marker;
1572   }
1573   return 0;
1574 }
1575
1576 /* vim:se ts=2 sw=2 et: */
Note: See TracBrowser for help on using the browser.