root/trunk/spreadlogd.c

Revision 42, 7.5 kB (checked in by jesus, 8 years ago)

function updates and libevent logging

  • Property svn:eol-style set to native
  • Property svn:executable set to *
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /* ======================================================================
2  * Copyright (c) 2000 Theo Schlossnagle
3  * All rights reserved.
4  * The following code was written by Theo Schlossnagle <jesus@omniti.com>
5  * This code was written to facilitate clustered logging via Spread.
6  * More information on Spread can be found at http://www.spread.org/
7  * Please refer to the LICENSE file before using this software.
8  * ======================================================================
9 */
10
11 #include "sld_config.h"
12 #include "module.h"
13
14 #define SPREADLOGD_VERSION "1.5.0"
15
16 #define _TODO_JOIN 1
17 #define _TODO_PARANOID_CONNECT 2
18
19 #define RECONNECT_INTERVAL 5
20
21 extern char *optarg;
22 extern int optind, opterr, optopt;
23
24 int verbose = 0;
25 int extralog = 0;
26 int terminate = 0;
27 int huplogs = 0;
28 int skiplocking = 0;
29 int buffsize = -1;
30 char *module_dir = NULL;
31
32 static char *default_configfile = ETCDIR "/spreadlogd.conf";
33 static int connectandjoin(SpreadConfiguration *sc, void *uv);
34 static void handle_message(int fd, short event, void *arg);
35 static int paranoid_establish_spread_connections();
36
37 void usage(char *progname) {
38   fprintf(stderr, "%s\t\tVERSION: %s\n \
39 \t-c configfile\t\t[default " ETCDIR "/spreadlogd.conf]\n \
40 \t-s\t\t\tskip locking (flock) files (NOT RECOMMENDED)\n \
41 \t-v\t\t\tverbose mode\n \
42 \t-x\t\t\tlog errors talking with spread\n \
43 \t-D\t\t\tdo not daemonize (debug)\n \
44 \t-V\t\t\tshow version information\n", progname, SPREADLOGD_VERSION);
45   exit(1);
46 }
47
48 void sig_handler(int fd, short event, void *arg) {
49   struct event *signal = arg;
50   if(EVENT_SIGNAL(signal) == SIGHUP)
51     huplogs = 1;
52   else if(EVENT_SIGNAL(signal) == SIGTERM)
53     terminate = 1;
54 }
55
56 int join(LogFacility *lf, void *vpfd) {
57   int ret;
58   int fd = *(int *)vpfd;
59   if((ret = SP_join(fd, lf->groupname)) == 0) {
60     if(verbose)
61       fprintf(stderr, "Joined %s.\n", lf->groupname);
62   }
63   return ret;
64 }
65
66 static int connectandjoin(SpreadConfiguration *sc, void *uv) {
67   int mbox, err;
68   int *todo = (int *)uv;
69   char sld[MAX_GROUP_NAME];
70   snprintf(sld, MAX_GROUP_NAME, "sld-%05d", getpid());
71   if(sc->connected ||
72      (err = SP_connect(config_get_spreaddaemon(sc),
73                        sld, 1, 0, &mbox,
74                        sc->private_group)) == ACCEPT_SESSION) {
75     if(!sc->connected) {
76       if(verbose)
77         fprintf(stderr, "Successfully connected to spread at %s%c%s on %d\n",
78                 (sc->host)?sc->host:"/tmp",
79                 (sc->host)?':':'/',
80                 sc->port, mbox);
81       sc->fd = mbox;
82     }
83     sc->connected = 1;
84     if(*todo & _TODO_JOIN)
85       config_foreach_logfacility(sc, join, &mbox);
86     event_set(&sc->event, mbox, EV_READ|EV_PERSIST, handle_message, sc);
87     event_add(&sc->event, NULL);
88     return mbox;
89   }
90   if(verbose)
91     fprintf(stderr, "Failed connection to spread at %s%c%s\n",
92             (sc->host)?sc->host:"/tmp",
93             (sc->host)?':':'/',
94             sc->port);
95   sc->connected = 0;
96   return -1; 
97 }
98
99 static void handle_signals() {
100   if(terminate) {
101     if(extralog) fprintf(stderr, "Received SIGTERM, closing log files.\n");
102     config_close();
103     if(extralog) fprintf(stderr, "Log files closed, exiting.\n");
104     exit(0);
105   }
106   if(huplogs) {
107     huplogs = 0;
108     config_hup();
109   }
110 }
111
112 void daemonize(void) {
113   if(fork()!=0) exit(0);
114   setsid();
115   if(fork()!=0) exit(0);
116 }
117
118 static void handle_message(int fd, short event, void *arg) {
119   SpreadConfiguration *sc = (SpreadConfiguration *)arg;
120   service service_type;
121   char sender[MAX_GROUP_NAME];
122   char *pmessage;
123   int len, num_groups, endian, logfd;
124   char groups[1][MAX_GROUP_NAME];
125   int16 mess_type;
126 #ifdef DROP_RECV
127   service_type = DROP_RECV;
128 #endif
129   char *message = NULL;
130
131   message = (char *)malloc(buffsize*sizeof(char) + 1);
132
133   len = SP_receive(sc->fd, &service_type, sender,
134                    1, &num_groups, groups,
135                    &mess_type, &endian, buffsize, message);
136   /* Handle errors correctly */
137   if(len == ILLEGAL_SESSION || len == CONNECTION_CLOSED ||
138      len == ILLEGAL_MESSAGE || len == BUFFER_TOO_SHORT) {
139     if(extralog) {
140       fprintf(stderr, "Error receiving from spread:\n\t");
141       SP_error(len);
142     }
143     /* These are errors that require reestablishing a connection */
144     if(len == ILLEGAL_SESSION || len == CONNECTION_CLOSED) {
145       /* So, let's try */
146       int retval, tojoin;
147
148       if(extralog) {
149         fprintf(stderr, "Error closing spread mailbox %d\n", sc->fd);
150       }
151       SP_disconnect(sc->fd);
152       event_del(&sc->event);
153       sc->connected = 0;
154       tojoin = _TODO_JOIN;
155       retval = connectandjoin(sc, &tojoin);
156     }
157   }
158   else if(Is_regular_mess(service_type)) {
159     logfd = config_get_fd(sc, groups[0], message);
160     message[len] = '\0';
161     pmessage = config_process_message(sc ,groups[0], message, &len);
162     config_do_external_module(sc, sender, groups[0], message);
163 #ifdef PERL
164     config_do_external_perl(sc, sender, groups[0], message);
165 #endif
166 #ifdef PYTHON
167     config_do_external_python(sc, sender, groups[0], message);
168 #endif
169     if(logfd>=0) write(logfd, pmessage, len);
170   }
171   else if(len < 0) {
172     if(errno == EBADF) {
173       /* Our Spread connection is bad */
174       paranoid_establish_spread_connections();
175     }
176   }
177   handle_signals();
178   if(message) free(message);
179 }
180
181 static int paranoid_establish_spread_connections() {
182   int tojoin = _TODO_JOIN | _TODO_PARANOID_CONNECT;
183   return config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
184 }
185 static int establish_spread_connections() {
186   int tojoin = _TODO_JOIN;
187   return config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
188 }
189
190 static void reconnect_spread(int fd, short event, void *arg) {
191   struct timeval tv;
192   struct event *reconn = (struct event *)arg;
193   int tojoin;
194
195   tojoin = _TODO_JOIN;
196   config_foreach_spreadconf(connectandjoin, (void *)&tojoin);
197
198   timerclear(&tv);
199   tv.tv_sec = RECONNECT_INTERVAL;
200   event_add(reconn, &tv);
201 }
202
203 static void stderr_debug(int s, const char *m) {
204   fprintf(stderr, "[%d] %s\n", s, m);
205 }
206 int main(int argc, char **argv) {
207   char *configfile = default_configfile;
208   struct event signal_hup, signal_term, reconn;
209   int getoption, debug = 0, rv;
210   struct timeval tv;
211
212   event_init();
213   module_init();
214
215   while((getoption = getopt(argc, argv, "b:c:svxDV")) != -1) {
216     switch(getoption) {
217     case 'b':
218       buffsize = atoi(optarg);
219     case 'c':
220       configfile = optarg;
221       break;
222     case 's':
223       skiplocking = 1;
224       break;
225     case 'v':
226       verbose = 1;
227       break;
228     case 'x':
229       extralog = 1;
230       break;
231     case 'D':
232       debug = 1;
233       break;
234     default:
235       usage(argv[0]);
236     }
237   }
238  
239   /* Read our configuration */
240   if(config_init(configfile)) exit(-1);
241   if(buffsize<0) buffsize = 1024*8; /* 8k buffer (like Apache) */
242
243   if(verbose) {
244     fprintf(stderr, "running spreadlogd as %s\n\tconfigfile:\t\t%s\n\tdebug:\t\t%s\n\tverbose:\t\t%s\n\tlog spread errors:\t%s\n\tbuffer size:\t\t%d\n",
245             argv[0],
246             configfile,
247             (debug)?"YES":"NO",
248             (verbose)?"YES":"NO",
249             (extralog)?"YES":"NO",
250             buffsize);
251   }
252
253   if(!debug) daemonize();
254   else event_set_log_callback(stderr_debug);
255
256   /* SIGHUP and SIGTERM */
257   event_set(&signal_hup, SIGHUP, EV_SIGNAL|EV_PERSIST, sig_handler,
258             &signal_hup);
259   event_add(&signal_hup, NULL);
260   event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler,
261             &signal_term);
262   event_add(&signal_term, NULL);
263   /* Periodic reconnect */
264   evtimer_set(&reconn, reconnect_spread, &reconn);
265   timerclear(&tv);
266   tv.tv_sec = RECONNECT_INTERVAL;
267   event_add(&reconn, &tv);
268  
269   establish_spread_connections();
270   rv = event_dispatch();
271   printf("event_dispatch -> %d [%d]\n", rv, errno);
272   config_cleanup();
273   return 0;
274 }
275
Note: See TracBrowser for help on using the browser.