root/trunk/spreadlogd.c

Revision 21, 8.7 kB (checked in by jesus, 12 years ago)

failure bugfix

  • Property svn:eol-style set to native
  • Property svn:executable set to *
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /* ======================================================================
2  * Copyright (c) 2000 Theo Schlossnagle
3  * All rights reserved.
4  * The following code was written by Theo Schlossnagle <jesus@omniti.com>
5  * This code was written to facilitate clustered logging via Spread.
6  * More information on Spread can be found at http://www.spread.org/
7  * Please refer to the LICENSE file before using this software.
8  * ======================================================================
9 */
10
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <signal.h>
15 #include <unistd.h>
16 #include <sys/time.h>
17 #include <sys/resource.h>
18 #include <sp.h>
19 #include <errno.h>
20
21 #include "config.h"
22
23 #define SPREADLOGD_VERSION "1.4.3"
24
25 #define _TODO_JOIN 1
26 #define _TODO_PARANOID_CONNECT 2
27
28 #ifndef FD_SETSIZE
29 #define FD_SETSIZE 1024
30 #endif
31
32 extern char *optarg;
33 extern int optind, opterr, optopt;
34
35 int verbose = 0;
36 int extralog = 0;
37 int terminate = 0;
38 int huplogs = 0;
39 int skiplocking = 0;
40 int buffsize = -1;
41 SpreadConfiguration **fds;
42 int fdsetsize;
43 int nr_open;
44
45 static char *default_configfile = "/etc/spreadlogd.conf";
46
47 void usage(char *progname) {
48   fprintf(stderr, "%s\t\tVERSION: %s\n \
49 \t-c configfile\t\t[default /etc/spreadlogd.conf]\n \
50 \t-s\t\t\tskip locking (flock) files (NOT RECOMMENDED)\n \
51 \t-v\t\t\tverbose mode\n \
52 \t-x\t\t\tlog errors talking with spread\n \
53 \t-D\t\t\tdo not daemonize (debug)\n \
54 \t-V\t\t\tshow version information\n", progname, SPREADLOGD_VERSION);
55   exit(1);
56 }
57
58 void sig_handler(int signum) {
59   /* Set a "hup my logs" flag */
60   if(signum == SIGHUP)
61     huplogs = 1;
62   else if(signum == SIGTERM)
63     terminate = 1;
64 }
65
66 int join(LogFacility *lf, void *vpfd) {
67   int ret;
68   int fd = *(int *)vpfd;
69   if((ret = SP_join(fd, lf->groupname)) == 0) {
70     if(verbose)
71       fprintf(stderr, "Joined %s.\n", lf->groupname);
72   }
73   return ret;
74 }
75
76 int connectandjoin(SpreadConfiguration *sc, void *uv) {
77   int mbox;
78   int *todo = (int *)uv;
79   char sld[MAX_GROUP_NAME];
80   snprintf(sld, MAX_GROUP_NAME, "sld-%05d", getpid());
81   if(sc->connected && *todo & _TODO_PARANOID_CONNECT) {
82     /* We _think_ we are connected, but want to really check */
83     int i;
84     int retval = 0;
85     struct timeval timeout;
86     fd_set readset, exceptset;
87
88     for(i=0;i<fdsetsize;i++)
89       if(fds[i] == sc)
90         break;
91     if(i>=fdsetsize) {
92       sc->connected = 0;
93     } else {
94       timeout.tv_sec = 0L;
95       timeout.tv_usec = 0L;
96       FD_ZERO(&readset); FD_SET(i, &readset);
97       FD_ZERO(&exceptset); FD_SET(i, &exceptset);
98
99       retval = select(fdsetsize, &readset, NULL, &exceptset, &timeout);
100       if(retval < 0 && errno == EBADF) {
101         sc->connected = 0;
102         fds[i] = NULL;
103       }
104     }
105   }
106   if(sc->connected || SP_connect(config_get_spreaddaemon(sc),
107                                  sld, 1, 0, &mbox,
108                                  sc->private_group) == ACCEPT_SESSION) {
109     if(!sc->connected) {
110       if(verbose)
111         fprintf(stderr, "Successfully connected to spread at %s%c%s\n",
112                 (sc->host)?sc->host:"/tmp",
113                 (sc->host)?':':'/',
114                 sc->port);
115       fds[mbox] = sc;
116     }
117     sc->connected = 1;
118     if(*todo & _TODO_JOIN)
119       config_foreach_logfacility(sc, join, &mbox);
120     return mbox;
121   } else {
122     if(verbose)
123       fprintf(stderr, "Failed connection to spread at %s%c%s\n",
124               (sc->host)?sc->host:"/tmp",
125               (sc->host)?':':'/',
126               sc->port);
127     sc->connected = 0;
128   }
129   return -1; 
130 }
131
132 int paranoid_establish_spread_connections() {
133   int tojoin = _TODO_JOIN | _TODO_PARANOID_CONNECT;
134   return config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
135 }
136 int establish_spread_connections() {
137   int tojoin = _TODO_JOIN;
138   return config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
139 }
140
141 void handle_signals() {
142   if(terminate) {
143     if(extralog) fprintf(stderr, "Received SIGTERM, closing log files.\n");
144     config_close();
145     if(extralog) fprintf(stderr, "Log files closed, exiting.\n");
146     exit(0);
147   }
148   if(huplogs) {
149     huplogs = 0;
150     config_hup();
151   }
152 }
153 void daemonize(void) {
154   if(fork()!=0) exit(0);
155   setsid();
156   if(fork()!=0) exit(0);
157 }
158 int getnropen(void) {
159   struct rlimit rlim;
160   getrlimit(RLIMIT_NOFILE, &rlim);
161   rlim.rlim_cur = rlim.rlim_max;
162   setrlimit(RLIMIT_NOFILE, &rlim);
163   return rlim.rlim_cur;
164 }
165
166 int main(int argc, char **argv) {
167   char *configfile = default_configfile;
168   char *message;
169   int getoption, debug = 0;
170   struct sigaction signalaction;
171   sigset_t ourmask;
172         nr_open = getnropen();
173
174   fdsetsize = FD_SETSIZE;
175   fds = (SpreadConfiguration **)malloc(sizeof(SpreadConfiguration *)*
176                                        fdsetsize);
177   memset(fds, 0, sizeof(SpreadConfiguration *)*fdsetsize);
178
179   while((getoption = getopt(argc, argv, "b:c:svxDV")) != -1) {
180     switch(getoption) {
181     case 'b':
182       buffsize = atoi(optarg);
183     case 'c':
184       configfile = optarg;
185       break;
186     case 's':
187       skiplocking = 1;
188       break;
189     case 'v':
190       verbose = 1;
191       break;
192     case 'x':
193       extralog = 1;
194       break;
195     case 'D':
196       debug = 1;
197       break;
198     default:
199       usage(argv[0]);
200     }
201   }
202  
203   /* Read our configuration */
204   if(config_init(configfile)) exit(-1);
205
206   if(buffsize<0) buffsize = 1024*8; /* 8k buffer (like Apache) */
207   message = (char *)malloc(buffsize*sizeof(char));
208
209   if(verbose) {
210     fprintf(stderr, "running spreadlogd as %s\n\tconfigfile:\t\t%s\n\tdebug:\t\t%s\n\tverbose:\t\t%s\n\tlog spread errors:\t%s\n\tbuffer size:\t\t%d\n",
211             argv[0],
212             configfile,
213             (debug)?"YES":"NO",
214             (verbose)?"YES":"NO",
215             (extralog)?"YES":"NO",
216             buffsize);
217   }
218
219   if(!debug) daemonize();
220  
221   /* Set up HUP signal */
222   signalaction.sa_handler = sig_handler;
223   sigemptyset(&signalaction.sa_mask);
224   signalaction.sa_flags = 0;
225   if(sigaction(SIGHUP, &signalaction, NULL)) {
226     fprintf(stderr, "An error occured while registering a SIGHUP handler\n");
227     perror("sigaction");
228   }
229   if(sigaction(SIGTERM, &signalaction, NULL)) {
230     fprintf(stderr, "An error occured while registering a SIGTERM handler\n");
231     perror("sigaction");
232   }
233   sigemptyset(&ourmask);
234   sigaddset(&ourmask, SIGHUP);
235   sigprocmask(SIG_UNBLOCK, &ourmask, NULL);
236
237   /* Connect to spread */
238   while(1) {
239     int fd, tojoin;
240     fd_set readset, exceptset, masterset;
241     sp_time lasttry, thistry, timediff;
242     service service_type;
243     char sender[MAX_GROUP_NAME];
244     char *pmessage;
245     int len, num_groups, endian, logfd;
246     char groups[1][MAX_GROUP_NAME];
247     int16 mess_type;
248 #ifdef DROP_RECV
249     service_type = DROP_RECV;
250 #endif
251
252     establish_spread_connections();
253     FD_ZERO(&masterset);
254     for(fd=0;fd<fdsetsize;fd++) {
255       if(fds[fd]) {
256         fprintf(stderr, "Setting FD: %d\n", fd);
257         FD_SET(fd, &masterset);
258       }
259     }
260     lasttry = E_get_time();
261     while(1) {
262       /* Build out select */
263       struct timeval timeout;
264       int i;
265
266       memcpy(&readset, &masterset, sizeof(fd_set));
267       memcpy(&exceptset, &masterset, sizeof(fd_set));
268       timeout.tv_sec = 1L;
269       timeout.tv_usec = 0L;
270       if(select(fdsetsize, &readset, NULL, &exceptset, &timeout) > 0) {
271         for(fd=0;fd<fdsetsize;fd++)
272           if(FD_ISSET(fd, &readset) || FD_ISSET(fd, &exceptset)) {
273             len = SP_receive(fd, &service_type, sender,
274                              1, &num_groups, groups,
275                              &mess_type, &endian, buffsize, message);
276             /* Handle errors correctly */
277             if(len == ILLEGAL_SESSION || len == CONNECTION_CLOSED ||
278                len == ILLEGAL_MESSAGE || len == BUFFER_TOO_SHORT) {
279               if(extralog) {
280                 fprintf(stderr, "Error receiving from spread:\n\t");
281                 SP_error(len);
282               }
283               /* These are errors that require reestablishing a connection */
284               if(len == ILLEGAL_SESSION || len == CONNECTION_CLOSED) {
285                 /* So, let's try */
286                 SpreadConfiguration *thissc = fds[fd];
287                 int retval;
288
289                 if(extralog) {
290                   fprintf(stderr, "Terminal error closing spread mailbox %d\n",
291                           fd);
292                 }
293                 fds[fd] = NULL;
294                 FD_CLR(fd, &masterset);
295                 tojoin = 1;
296                 thissc->connected = 0;
297                 retval = connectandjoin(thissc, &tojoin);
298                 if(retval >= 0)
299                   FD_SET(retval, &masterset);
300                 else if(extralog)
301                   fprintf(stderr, "Error connecting to spread daemon\n");
302               }
303             } else if(Is_regular_mess(service_type)) {
304               logfd = config_get_fd(fds[fd], groups[0], message);
305               if(logfd<0) continue;
306               pmessage = config_process_message(fds[fd],groups[0], message, &len);
307               write(logfd, pmessage, len);
308 sleep(1);
309             }
310 #ifdef DROP_RECV
311             /* Set DROP_RECV flag if we can */
312             service_type = DROP_RECV;
313 #endif
314           }
315       } else {
316         if(errno == EBADF) {
317           /* Our Spread connection is bad */
318           paranoid_establish_spread_connections();
319         }
320       }
321       handle_signals();
322       thistry = E_get_time();
323       timediff = E_sub_time(thistry, lasttry);
324       if(timediff.sec > 5) {
325         lasttry = thistry;
326         tojoin = 1;
327         config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
328         FD_ZERO(&masterset);
329         for(i=0;i<fdsetsize;i++) {
330           if(fds[i]) FD_SET(i, &masterset);
331         }
332       }
333     }
334   }
335   return -1;
336 }
Note: See TracBrowser for help on using the browser.