root/trunk/spreadlogd-kqueue.c

Revision 27, 8.7 kB (checked in by jesus, 11 years ago)

remove debugging line

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /* ======================================================================
2  * Copyright (c) 2000 Theo Schlossnagle
3  * All rights reserved.
4  * The following code was written by Theo Schlossnagle <jesus@omniti.com>
5  * This code was written to facilitate clustered logging via Spread.
6  * More information on Spread can be found at http://www.spread.org/
7  * Please refer to the LICENSE file before using this software.
8  * ======================================================================
9 */
10
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <signal.h>
15 #include <unistd.h>
16 #include <sys/types.h>
17 #include <sys/event.h>
18 #include <sys/time.h>
19 #include <sys/resource.h>
20 #include <errno.h>
21 #include <sp.h>
22
23 #include "config.h"
24
25 #define SPREADLOGD_VERSION "1.5.0-kq"
26
27 extern char *optarg;
28 extern int optind, opterr, optopt;
29
30 int verbose = 0;
31 int extralog = 0;
32 int terminate = 0;
33 int huplogs = 0;
34 int skiplocking = 0;
35 int buffsize = -1;
36 int fdsetsize;
37 int nr_open;
38
39 static char *default_configfile = "/etc/spreadlogd.conf";
40
41 void usage(char *progname) {
42   fprintf(stderr, "%s\t\tVERSION: %s\n \
43 \t-c configfile\t\t[default /etc/spreadlogd.conf]\n \
44 \t-s\t\t\tskip locking (flock) files (NOT RECOMMENDED)\n \
45 \t-v\t\t\tverbose mode\n \
46 \t-x\t\t\tlog errors talking with spread\n \
47 \t-D\t\t\tdo not daemonize (debug)\n \
48 \t-V\t\t\tshow version information\n", progname, SPREADLOGD_VERSION);
49   exit(1);
50 }
51
52 static int kqueue_fd = -1;
53 static struct kevent *ke_vec;
54 static unsigned ke_vec_a = 0;
55 static unsigned ke_vec_used = 0;
56
57 static void
58 ke_change (register int const ident,
59            register int const filter,
60            register int const flags,
61            register void *const udata)
62 {
63   enum { initial_alloc = 64 };
64   register struct kevent *kep;
65
66   if (!ke_vec_a)
67     {
68       ke_vec_a = initial_alloc;
69       ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
70     }
71   else if (ke_vec_used == ke_vec_a)
72     {
73       ke_vec_a <<= 1;
74       ke_vec =
75         (struct kevent *) realloc (ke_vec,
76                                    ke_vec_a * sizeof (struct kevent));
77     }
78   kep = &ke_vec[ke_vec_used++];
79
80   kep->ident = ident;
81   kep->filter = filter;
82   kep->flags = flags;
83   kep->fflags = 0;
84   kep->data = 0;
85   kep->udata = udata;
86
87   if(verbose)
88         fprintf(stderr, "kevent schedule: fd %d filter %d flags %d data %p\n",
89                 ident, filter, flags, udata);   
90 }
91 void sig_handler(int signum) {
92   /* Set a "hup my logs" flag */
93   if(signum == SIGHUP)
94     huplogs = 1;
95   else if(signum == SIGTERM)
96     terminate = 1;
97 }
98
99 int join(LogFacility *lf, void *vpfd) {
100   int ret;
101   int fd = *(int *)vpfd;
102   if((ret = SP_join(fd, lf->groupname)) == 0) {
103     if(verbose)
104       fprintf(stderr, "Joined %s.\n", lf->groupname);
105   }
106   return ret;
107 }
108
109 int connectandjoin(SpreadConfiguration *sc, void *uv) {
110   int mbox;
111   int *tojoin = (int *)uv;
112   char sld[MAX_GROUP_NAME];
113   snprintf(sld, MAX_GROUP_NAME, "sld-%05d", getpid());
114   if(sc->connected || SP_connect(config_get_spreaddaemon(sc),
115                                  sld, 1, 0, &mbox,
116                                  sc->private_group) == ACCEPT_SESSION) {
117     if(!sc->connected) {
118       if(verbose)
119         fprintf(stderr, "Successfully connected to spread at %s%c%s\n",
120                 (sc->host)?sc->host:"/tmp",
121                 (sc->host)?':':'/',
122                 sc->port);
123     }
124     sc->connected = 1;
125     if(*tojoin)
126       config_foreach_logfacility(sc, join, &mbox);
127     ke_change(mbox, EVFILT_READ, EV_ADD|EV_ENABLE, sc);
128     return mbox;
129   } else {
130     if(verbose)
131       fprintf(stderr, "Failed connection to spread at %s%c%s\n",
132               (sc->host)?sc->host:"/tmp",
133               (sc->host)?':':'/',
134               sc->port);
135     sc->connected = 0;
136   }
137   return -1; 
138 }
139
140 int establish_spread_connections() {
141   int tojoin = 1;
142   return config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
143 }
144
145 void handle_signals() {
146   if(terminate) {
147     if(extralog) fprintf(stderr, "Received SIGTERM, closing log files.\n");
148     config_close();
149     if(extralog) fprintf(stderr, "Log files closed, exiting.\n");
150     exit(0);
151   }
152   if(huplogs) {
153     huplogs = 0;
154     config_hup();
155   }
156 }
157 void daemonize(void) {
158   if(fork()!=0) exit(0);
159   setsid();
160   if(fork()!=0) exit(0);
161 }
162 int getnropen(void) {
163   struct rlimit rlim;
164   getrlimit(RLIMIT_NOFILE, &rlim);
165   rlim.rlim_cur = rlim.rlim_max;
166   setrlimit(RLIMIT_NOFILE, &rlim);
167   return rlim.rlim_cur;
168 }
169
170 int main(int argc, char **argv) {
171 #ifdef SPREAD_VERSION
172   int mver, miver, pver;
173 #endif
174   char *configfile = default_configfile;
175   char *message;
176   int getoption, debug = 0;
177   struct sigaction signalaction;
178   sigset_t ourmask;
179         nr_open = getnropen();
180
181   fdsetsize = getdtablesize();
182
183   while((getoption = getopt(argc, argv, "b:c:svxDV")) != -1) {
184     switch(getoption) {
185     case 'b':
186       buffsize = atoi(optarg);
187     case 'c':
188       configfile = optarg;
189       break;
190     case 's':
191       skiplocking = 1;
192       break;
193     case 'v':
194       verbose = 1;
195       break;
196     case 'x':
197       extralog = 1;
198       break;
199     case 'D':
200       debug = 1;
201       break;
202     default:
203       usage(argv[0]);
204     }
205   }
206  
207   /* Read our configuration */
208   if(config_init(configfile)) exit(-1);
209
210   if(buffsize<0) buffsize = 1024*8; /* 8k buffer (like Apache) */
211   message = (char *)malloc(buffsize*sizeof(char));
212
213   if(verbose) {
214     fprintf(stderr, "running spreadlogd as %s\n\tconfigfile:\t\t%s\n\tdebug:\t\t%s\n\tverbose:\t\t%s\n\tlog spread errors:\t%s\n\tbuffer size:\t\t%d\n",
215             argv[0],
216             configfile,
217             (debug)?"YES":"NO",
218             (verbose)?"YES":"NO",
219             (extralog)?"YES":"NO",
220             buffsize);
221   }
222
223   if(!debug) daemonize();
224  
225   /* Set up HUP signal */
226   signalaction.sa_handler = sig_handler;
227   sigemptyset(&signalaction.sa_mask);
228   signalaction.sa_flags = 0;
229   if(sigaction(SIGHUP, &signalaction, NULL)) {
230     fprintf(stderr, "An error occured while registering a SIGHUP handler\n");
231     perror("sigaction");
232   }
233   if(sigaction(SIGTERM, &signalaction, NULL)) {
234     fprintf(stderr, "An error occured while registering a SIGTERM handler\n");
235     perror("sigaction");
236   }
237   sigemptyset(&ourmask);
238   sigaddset(&ourmask, SIGHUP);
239   sigprocmask(SIG_UNBLOCK, &ourmask, NULL);
240
241   kqueue_fd = kqueue();
242   if (kqueue_fd < 0) {
243     fprintf(stderr, "Cannot create kqueue: %s\n", strerror(errno));
244     exit(-1);
245   }
246
247   /* Connect to spread */
248   while(1) {
249     int fd, tojoin;
250     sp_time lasttry, thistry, timediff;
251     service service_type;
252     char sender[MAX_GROUP_NAME];
253     char *pmessage;
254     int len, num_groups, endian, logfd;
255     char groups[1][MAX_GROUP_NAME];
256     int16 mess_type;
257 #ifdef DROP_RECV
258     service_type = DROP_RECV;
259 #endif
260
261     establish_spread_connections();
262     lasttry = E_get_time();
263     while(1) {
264       /* Build out select */
265       struct timespec tspec;
266       int ret;
267
268       tspec.tv_sec = 1L;
269       tspec.tv_nsec = 0L;
270       ret = kevent(kqueue_fd, ke_vec, ke_vec_used, ke_vec, ke_vec_a, &tspec);
271       if(ret < 0) {
272         fprintf(stderr, "kevent error: %s\n", strerror(errno));
273       }
274       if(ret > 0) {
275         struct kevent *kep;
276         for(kep = ke_vec; kep < &ke_vec[ret]; kep++) {
277           SpreadConfiguration *sc = (SpreadConfiguration *)kep->udata;
278           fd = kep->ident;
279           if(sc && (fd>0) && kep->filter==EVFILT_READ) {
280             len = SP_receive(fd, &service_type, sender,
281                              1, &num_groups, groups,
282                              &mess_type, &endian, buffsize, message);
283             /* Handle errors correctly */
284             if(len == ILLEGAL_SESSION || len == CONNECTION_CLOSED ||
285                len == ILLEGAL_MESSAGE || len == BUFFER_TOO_SHORT) {
286               if(extralog) {
287                 fprintf(stderr, "Error receiving from spread:\n\t");
288                 SP_error(len);
289               }
290               /* These are errors that require reestablishing a connection */
291               if(len == ILLEGAL_SESSION || len == CONNECTION_CLOSED) {
292                 /* So, let's try */
293                 int retval;
294
295                 if(extralog) {
296                   fprintf(stderr, "Terminal error closing spread mailbox %d\n",
297                           fd);
298                 }
299                 ke_change(fd, EVFILT_READ, EV_DELETE|EV_DISABLE, NULL);
300                 SP_disconnect(fd);
301                 tojoin = 1;
302                 sc->connected = 0;
303                 retval = connectandjoin(sc, &tojoin);
304                 if(retval < 0 && extralog)
305                   fprintf(stderr, "Error connecting to spread daemon\n");
306               }
307             } else if(Is_regular_mess(service_type)) {
308               logfd = config_get_fd(sc, groups[0], message);
309               if(logfd<0) continue;
310               pmessage = config_process_message(sc,groups[0], message, &len);
311 #ifdef PERL
312               config_do_external_perl(sc, sender, groups[0], message);
313 #endif
314 #ifdef PYTHON
315               config_do_external_python(sc, sender, groups[0], message);
316 #endif
317               if(logfd < 0) continue;
318               write(logfd, pmessage, len);
319             }
320 #ifdef DROP_RECV
321             /* Set DROP_RECV flag if we can */
322             service_type = DROP_RECV;
323 #endif
324           }
325         }
326       }
327       handle_signals();
328       thistry = E_get_time();
329       timediff = E_sub_time(thistry, lasttry);
330       if(timediff.sec > 5) {
331         lasttry = thistry;
332         tojoin = 1;
333         config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
334       }
335     }
336   }
337   return -1;
338 }
Note: See TracBrowser for help on using the browser.