root/src/jlog/jthreadtest.c

Revision cc981f5c55100675865b6960135d70148d8585af, 7.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

pull in jlog @ 52

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2008, Message Systems, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name Message Systems, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include <stdio.h>
34 #include <pthread.h>
35 #include <unistd.h>
36
37 #include "jlog_config.h"
38 #include "jlog.h"
39
40 #define MASTER     "master"
41 #define LOGNAME    "/tmp/jtest.foo"
42
43 int writer_done = 0;
44 int only_read = 0;
45 int only_write = 0;
46
47 static void _croak(int lineno)
48 {
49   fprintf(stderr, "croaked at line %d\n", lineno);
50   exit(2);
51 }
52 #define croak() _croak(__LINE__)
53
54 void jcreate(jlog_safety s) {
55   jlog_ctx *ctx;
56   const char *label = NULL;
57  
58   switch (s) {
59     case JLOG_ALMOST_SAFE: label = "almost safe"; break;
60     case JLOG_UNSAFE:      label = "unsafe"; break;
61     case JLOG_SAFE:        label = "safe"; break;
62   }
63   fprintf(stderr, "jcreate %s in %s mode\n", LOGNAME, label);
64  
65   ctx = jlog_new(LOGNAME);
66   jlog_ctx_alter_journal_size(ctx, 102400);
67   jlog_ctx_alter_safety(ctx, s);
68   if(jlog_ctx_init(ctx) != 0) {
69     fprintf(stderr, "jlog_ctx_init failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
70     if(jlog_ctx_err(ctx) != JLOG_ERR_CREATE_EXISTS) exit(0);
71   } else {
72     jlog_ctx_add_subscriber(ctx, MASTER, JLOG_BEGIN);
73   }
74   jlog_ctx_close(ctx);
75 }
76
77 void *writer(void *unused) {
78   jlog_ctx *ctx;
79   int i;
80   char foo[1523];
81   ctx = jlog_new(LOGNAME);
82   memset(foo, 'X', sizeof(foo)-1);
83   foo[sizeof(foo)-1] = '\0';
84   if(jlog_ctx_open_writer(ctx) != 0) {
85     fprintf(stderr, "jlog_ctx_open_writer failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
86     croak();
87   }
88   for(i=0;i<1000;i++) {
89     int rv;
90     fprintf(stderr, "writer...\n");
91     rv = jlog_ctx_write(ctx, foo, strlen(foo));
92     if(rv != 0) {
93       fprintf(stderr, "jlog_ctx_write_message failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
94       /* abort(); */
95     }
96   }
97   jlog_ctx_close(ctx);
98   writer_done = 1;
99   return 0;
100 }
101
102 void *reader(void *unused) {
103   jlog_ctx *ctx;
104   char subname[32];
105   int tcount = 0;
106   int prev_err = 0;
107   int subno = (int)unused;
108   snprintf(subname, sizeof(subname), "sub-%02d", subno);
109 reader_retry:
110   ctx = jlog_new(LOGNAME);
111   if(jlog_ctx_open_reader(ctx, subname) != 0) {
112     if(prev_err == 0) {
113       prev_err = jlog_ctx_err(ctx);
114       jlog_ctx_close(ctx);
115       ctx = jlog_new(LOGNAME);
116       if(prev_err == JLOG_ERR_INVALID_SUBSCRIBER) {
117         fprintf(stderr, "[%02d] invalid subscriber, init...\n", subno);
118         if(jlog_ctx_open_writer(ctx) != 0) {
119           fprintf(stderr, "[%02d] jlog_ctx_open_writer failed: %d %s\n", subno, jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
120         } else {
121           if(jlog_ctx_add_subscriber(ctx, subname, JLOG_BEGIN) != 0) {
122             fprintf(stderr, "[%02d] jlog_ctx_add_subscriber failed: %d %s\n", subno, jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
123           } else {
124             jlog_ctx_close(ctx);
125             goto reader_retry;
126           }
127         }
128       }
129     }
130     fprintf(stderr, "[%02d] jlog_ctx_open_reader failed: %d %s\n", subno, jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
131     croak();
132   }
133   fprintf(stderr, "[%02d] reader started\n", subno);
134   while(1) {
135     char begins[20], ends[20];
136     jlog_id begin, end;
137     int count;
138     jlog_message message;
139     if((count = jlog_ctx_read_interval(ctx, &begin, &end)) == -1) {
140       fprintf(stderr, "jlog_ctx_read_interval failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
141       croak();
142     }
143     jlog_snprint_logid(begins, sizeof(begins), &begin);
144     jlog_snprint_logid(ends, sizeof(ends), &end);
145     if(count > 0) {
146       int i;
147       fprintf(stderr, "[%02d] reader (%s, %s] count: %d\n", subno, begins, ends, count);
148       for(i=0; i<count; i++, JLOG_ID_ADVANCE(&begin)) {
149         end = begin;
150         if(jlog_ctx_read_message(ctx, &begin, &message) != 0) {
151           jlog_snprint_logid(begins, sizeof(begins), &begin);
152           fprintf(stderr, "[%02d] read failed @ %s: %d %s\n", subno, begins, jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
153         } else {
154           tcount++;
155           jlog_snprint_logid(begins, sizeof(begins), &begin);
156           /* fprintf(stderr, "[%02d] read: [%s]\n\t'%.*s'\n", subno, begins,
157                   message.mess_len, (char *)message.mess); */
158         }
159       }
160       if(jlog_ctx_read_checkpoint(ctx, &end) != 0) {
161         fprintf(stderr, "[%02d] checkpoint failed: %d %s\n", subno, jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
162       } else {
163         /* fprintf(stderr, "[%02d] \tcheckpointed...\n", subno); */
164       }
165     } else {
166       if(writer_done == 1) break;
167     }
168   }
169   jlog_ctx_close(ctx);
170   return (void *)tcount;
171 }
172
173 static void usage(void)
174 {
175   fprintf(stderr,
176           "usage: jthreadtest safety [safe|unsafe|almost_safe]\n"
177           "       jthreadtest remove [subscriber]\n\n");
178   exit(1);
179 }
180
181 #define THRCNT 20
182 int main(int argc, char **argv) {
183   int i;
184   char *toremove = NULL;
185   jlog_safety safety = JLOG_ALMOST_SAFE;
186   pthread_t tid[THRCNT];
187   void *foo;
188  
189 #if _WIN32
190   mem_init();
191 #endif
192
193   if(argc == 3) {
194     if(!strcmp(argv[1], "safety")) {
195       if(!strcmp(argv[2], "unsafe"))
196         safety = JLOG_UNSAFE;
197       else if(!strcmp(argv[2], "almost_safe"))
198         safety = JLOG_ALMOST_SAFE;
199       else if(!strcmp(argv[2], "safe"))
200         safety = JLOG_SAFE;
201       else {
202         fprintf(stderr, "invalid safety option\n");
203         usage();
204       }
205     } else if(!strcmp(argv[1], "only")) {
206       if(!strcmp(argv[2], "read")) only_read = 1;
207       else if(!strcmp(argv[2], "write")) only_write = 1;
208       else usage();
209     } else if(!strcmp(argv[1], "remove")) {
210       toremove = argv[2];
211     } else {
212       usage();
213     }
214   } else if(argc < 3 || argc > 3) {
215     usage();
216   }
217
218   jcreate(safety);
219
220   if(toremove) {
221     jlog_ctx *ctx;
222     ctx = jlog_new(LOGNAME);
223     if(jlog_ctx_open_writer(ctx) != 0) {
224       fprintf(stderr, "jlog_ctx_open_writer failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
225       croak();
226     }
227     jlog_ctx_remove_subscriber(ctx, argv[2]);
228     jlog_ctx_close(ctx);
229     exit(0);
230   }
231   if(!only_write) {
232     for(i=0; i<THRCNT; i++) {
233       pthread_create(&tid[i], NULL, reader, (void *)i);
234       fprintf(stderr, "[%d] started reader\n", (int)tid[i]);
235     }
236   }
237   if(!only_read) {
238     fprintf(stderr, "starting writer...\n");
239     writer(NULL);
240   } else {
241     sleep(5);
242     writer_done = 1;
243   }
244   if(!only_write) {
245     for(i=0; i<THRCNT; i++) {
246       pthread_join(tid[i], &foo);
247       fprintf(stderr, "[%d] joined, read %d\n", i, (int)foo);
248     }
249   }
250   return 0;
251 }
252 /* vim:se ts=2 sw=2 et: */
Note: See TracBrowser for help on using the browser.