root/tags/v_1_4_2/spreadlogd.c

Revision 16, 7.6 kB (checked in by jesus, 13 years ago)

Update for compile warnings and bug in skiplists with uninitialized variable.

  • Property svn:eol-style set to native
  • Property svn:executable set to *
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /* ======================================================================
2  * Copyright (c) 2000 Theo Schlossnagle
3  * All rights reserved.
4  * The following code was written by Theo Schlossnagle <jesus@omniti.com>
5  * This code was written to facilitate clustered logging via Spread.
6  * More information on Spread can be found at http://www.spread.org/
7  * Please refer to the LICENSE file before using this software.
8  * ======================================================================
9 */
10
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <signal.h>
15 #include <unistd.h>
16 #include <sys/time.h>
17 #include <sys/resource.h>
18 #include <sp.h>
19
20 #include "config.h"
21
22 #define SPREADLOGD_VERSION "1.4.2"
23
24 extern char *optarg;
25 extern int optind, opterr, optopt;
26
27 int verbose = 0;
28 int extralog = 0;
29 int terminate = 0;
30 int huplogs = 0;
31 int skiplocking = 0;
32 int buffsize = -1;
33 SpreadConfiguration **fds;
34 int fdsetsize;
35 int nr_open;
36
37 static char *default_configfile = "/etc/spreadlogd.conf";
38
39 void usage(char *progname) {
40   fprintf(stderr, "%s\t\tVERSION: %s\n \
41 \t-c configfile\t\t[default /etc/spreadlogd.conf]\n \
42 \t-s\t\t\tskip locking (flock) files (NOT RECOMMENDED)\n \
43 \t-v\t\t\tverbose mode\n \
44 \t-x\t\t\tlog errors talking with spread\n \
45 \t-D\t\t\tdo not daemonize (debug)\n \
46 \t-V\t\t\tshow version information\n", progname, SPREADLOGD_VERSION);
47   exit(1);
48 }
49
50 void sig_handler(int signum) {
51   /* Set a "hup my logs" flag */
52   if(signum == SIGHUP)
53     huplogs = 1;
54   else if(signum == SIGTERM)
55     terminate = 1;
56 }
57
58 int join(LogFacility *lf, void *vpfd) {
59   int ret;
60   int fd = *(int *)vpfd;
61   if((ret = SP_join(fd, lf->groupname)) == 0) {
62     if(verbose)
63       fprintf(stderr, "Joined %s.\n", lf->groupname);
64   }
65   return ret;
66 }
67
68 int connectandjoin(SpreadConfiguration *sc, void *uv) {
69   int mbox;
70   int *tojoin = (int *)uv;
71   char sld[MAX_GROUP_NAME];
72   snprintf(sld, MAX_GROUP_NAME, "sld-%05d", getpid());
73   if(sc->connected || SP_connect(config_get_spreaddaemon(sc),
74                                  sld, 1, 0, &mbox,
75                                  sc->private_group) == ACCEPT_SESSION) {
76     if(!sc->connected) {
77       if(verbose)
78         fprintf(stderr, "Successfully connected to spread at %s%c%s\n",
79                 (sc->host)?sc->host:"/tmp",
80                 (sc->host)?':':'/',
81                 sc->port);
82       fds[mbox] = sc;
83     }
84     sc->connected = 1;
85     if(*tojoin)
86       config_foreach_logfacility(sc, join, &mbox);
87     return mbox;
88   } else {
89     if(verbose)
90       fprintf(stderr, "Failed connection to spread at %s%c%s\n",
91               (sc->host)?sc->host:"/tmp",
92               (sc->host)?':':'/',
93               sc->port);
94     sc->connected = 0;
95   }
96   return -1; 
97 }
98
99 int establish_spread_connections() {
100   int tojoin = 1;
101   return config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
102 }
103
104 void handle_signals() {
105   if(terminate) {
106     if(extralog) fprintf(stderr, "Received SIGTERM, closing log files.\n");
107     config_close();
108     if(extralog) fprintf(stderr, "Log files closed, exiting.\n");
109     exit(0);
110   }
111   if(huplogs) {
112     huplogs = 0;
113     config_hup();
114   }
115 }
116 void daemonize(void) {
117   if(fork()!=0) exit(0);
118   setsid();
119   if(fork()!=0) exit(0);
120 }
121 int getnropen(void) {
122   struct rlimit rlim;
123   getrlimit(RLIMIT_NOFILE, &rlim);
124   rlim.rlim_cur = rlim.rlim_max;
125   setrlimit(RLIMIT_NOFILE, &rlim);
126   return rlim.rlim_cur;
127 }
128
129 int main(int argc, char **argv) {
130 #ifdef SPREAD_VERSION
131   int mver, miver, pver;
132 #endif
133   char *configfile = default_configfile;
134   char *message;
135   int getoption, debug = 0;
136   struct sigaction signalaction;
137   sigset_t ourmask;
138         nr_open = getnropen();
139
140   fdsetsize = getdtablesize();
141   fds = (SpreadConfiguration **)malloc(sizeof(SpreadConfiguration *)*
142                                        fdsetsize);
143   memset(fds, 0, sizeof(SpreadConfiguration *)*fdsetsize);
144
145   while((getoption = getopt(argc, argv, "b:c:svxDV")) != -1) {
146     switch(getoption) {
147     case 'b':
148       buffsize = atoi(optarg);
149     case 'c':
150       configfile = optarg;
151       break;
152     case 's':
153       skiplocking = 1;
154       break;
155     case 'v':
156       verbose = 1;
157       break;
158     case 'x':
159       extralog = 1;
160       break;
161     case 'D':
162       debug = 1;
163       break;
164     default:
165       usage(argv[0]);
166     }
167   }
168  
169   /* Read our configuration */
170   if(config_init(configfile)) exit(-1);
171
172   if(buffsize<0) buffsize = 1024*8; /* 8k buffer (like Apache) */
173   message = (char *)malloc(buffsize*sizeof(char));
174
175   if(verbose) {
176     fprintf(stderr, "running spreadlogd as %s\n\tconfigfile:\t\t%s\n\tdebug:\t\t%s\n\tverbose:\t\t%s\n\tlog spread errors:\t%s\n\tbuffer size:\t\t%d\n",
177             argv[0],
178             configfile,
179             (debug)?"YES":"NO",
180             (verbose)?"YES":"NO",
181             (extralog)?"YES":"NO",
182             buffsize);
183   }
184
185   if(!debug) daemonize();
186  
187   /* Set up HUP signal */
188   signalaction.sa_handler = sig_handler;
189   sigemptyset(&signalaction.sa_mask);
190   signalaction.sa_flags = 0;
191   if(sigaction(SIGHUP, &signalaction, NULL)) {
192     fprintf(stderr, "An error occured while registering a SIGHUP handler\n");
193     perror("sigaction");
194   }
195   if(sigaction(SIGTERM, &signalaction, NULL)) {
196     fprintf(stderr, "An error occured while registering a SIGTERM handler\n");
197     perror("sigaction");
198   }
199   sigemptyset(&ourmask);
200   sigaddset(&ourmask, SIGHUP);
201   sigprocmask(SIG_UNBLOCK, &ourmask, NULL);
202
203   /* Connect to spread */
204   while(1) {
205     int fd, tojoin;
206     fd_set readset, exceptset, masterset;
207     sp_time lasttry, thistry, timediff;
208     service service_type;
209     char sender[MAX_GROUP_NAME];
210     char *pmessage;
211     int len, num_groups, endian, logfd;
212     char groups[1][MAX_GROUP_NAME];
213     int16 mess_type;
214 #ifdef DROP_RECV
215     service_type = DROP_RECV;
216 #endif
217
218     establish_spread_connections();
219     FD_ZERO(&masterset);
220     for(fd=0;fd<fdsetsize;fd++) {
221       if(fds[fd]) {
222         fprintf(stderr, "Setting FD: %d\n", fd);
223         FD_SET(fd, &masterset);
224       }
225     }
226     lasttry = E_get_time();
227     while(1) {
228       /* Build out select */
229       struct timeval timeout;
230       int i;
231
232       readset = masterset;
233       exceptset = masterset;
234       timeout.tv_sec = 1L;
235       timeout.tv_usec = 0L;
236       if(select(fdsetsize, &readset, NULL, &exceptset, &timeout) > 0) {
237         for(fd=0;fd<fdsetsize;fd++)
238           if(FD_ISSET(fd, &readset) || FD_ISSET(fd, &exceptset)) {
239             len = SP_receive(fd, &service_type, sender,
240                              1, &num_groups, groups,
241                              &mess_type, &endian, buffsize, message);
242             /* Handle errors correctly */
243             if(len == ILLEGAL_SESSION || len == CONNECTION_CLOSED ||
244                len == ILLEGAL_MESSAGE || len == BUFFER_TOO_SHORT) {
245               if(extralog) {
246                 fprintf(stderr, "Error receiving from spread:\n\t");
247                 SP_error(len);
248               }
249               /* These are errors that require reestablishing a connection */
250               if(len == ILLEGAL_SESSION || len == CONNECTION_CLOSED) {
251                 /* So, let's try */
252                 SpreadConfiguration *thissc = fds[fd];
253                 int retval;
254
255                 if(extralog) {
256                   fprintf(stderr, "Terminal error closing spread mailbox %d\n",
257                           fd);
258                 }
259                 fds[fd] = NULL;
260                 FD_CLR(fd, &masterset);
261                 tojoin = 1;
262                 thissc->connected = 0;
263                 retval = connectandjoin(thissc, &tojoin);
264                 if(retval >= 0)
265                   FD_SET(retval, &masterset);
266                 else if(extralog)
267                   fprintf(stderr, "Error connecting to spread daemon\n");
268               }
269             } else if(Is_regular_mess(service_type)) {
270               logfd = config_get_fd(fds[fd], groups[0], message);
271               if(logfd<0) continue;
272               pmessage = config_process_message(fds[fd],groups[0], message, &len);
273               write(logfd, pmessage, len);
274             }
275 #ifdef DROP_RECV
276             /* Set DROP_RECV flag if we can */
277             service_type = DROP_RECV;
278 #endif
279           }
280       }
281       handle_signals();
282       thistry = E_get_time();
283       timediff = E_sub_time(thistry, lasttry);
284       if(timediff.sec > 5) {
285         lasttry = thistry;
286         tojoin = 1;
287         config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
288         FD_ZERO(&masterset);
289         for(i=0;i<fdsetsize;i++) {
290           if(fds[i]) FD_SET(i, &masterset);
291         }
292       }
293     }
294   }
295   return -1;
296 }
Note: See TracBrowser for help on using the browser.