Changeset b62cf2be087943dcb29b6e068bd4262862fcb17d
- Timestamp:
- 12/17/07 05:43:26 (5 years ago)
- git-parent:
- Files:
-
- configure.in (modified) (1 diff)
- src/eventer/eventer.c (modified) (1 diff)
- src/eventer/eventer.h (modified) (2 diffs)
- src/eventer/eventer_POSIX_fd_opset.c (modified) (1 diff)
- src/eventer/eventer_kqueue_impl.c (modified) (6 diffs)
- src/noit_defines.h (modified) (1 diff)
- src/noitd.c (modified) (2 diffs)
- src/utils/Makefile.in (modified) (1 diff)
- src/utils/noit_skiplist.c (added)
- src/utils/noit_skiplist.h (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
configure.in
rcd1ab55 rb62cf2b 12 12 AC_PATH_PROGS(PERL, perl) 13 13 AC_SUBST(PERL) 14 15 if test "x$CC" = "xgcc" ; then 16 CFLAGS="$CFLAGS -g -Wall" 17 fi 14 18 15 19 # Checks for data types src/eventer/eventer.c
r01751d3 rb62cf2b 8 8 e->opset = eventer_POSIX_fd_opset; 9 9 return e; 10 } 11 12 int eventer_timecompare(void *av, void *bv) { 13 /* Herein we avoid equality. This function is only used as a comparator 14 * for a heap of timed events. If they are equal, b is considered less 15 * just to maintain an order (despite it not being stable). 16 */ 17 eventer_t a = (eventer_t)av; 18 eventer_t b = (eventer_t)bv; 19 if(a->whence.tv_sec < b->whence.tv_sec) return -1; 20 if(a->whence.tv_sec == b->whence.tv_sec && 21 a->whence.tv_usec < b->whence.tv_usec) return -1; 22 return 1; 10 23 } 11 24 src/eventer/eventer.h
r01751d3 rb62cf2b 43 43 struct _event { 44 44 eventer_func_t callback; 45 struct timeval *whence;45 struct timeval whence; 46 46 int fd; 47 47 int mask; … … 52 52 API_EXPORT(eventer_t) eventer_alloc(); 53 53 API_EXPORT(void) eventer_free(eventer_t); 54 API_EXPORT(int) eventer_timecompare(void *a, void *b); 54 55 55 56 typedef struct _eventer_impl { src/eventer/eventer_POSIX_fd_opset.c
r01751d3 rb62cf2b 8 8 9 9 #include <sys/socket.h> 10 #include <unistd.h> 10 11 11 12 static int src/eventer/eventer_kqueue_impl.c
rcd1ab55 rb62cf2b 7 7 #include "eventer/eventer.h" 8 8 #include "utils/noit_atomic.h" 9 9 #include "utils/noit_skiplist.h" 10 11 #include <errno.h> 12 #include <stdio.h> 13 #include <stdlib.h> 10 14 #include <sys/event.h> 11 15 #include <pthread.h> 12 13 int maxfds; 14 struct { 16 #include <assert.h> 17 18 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */ 19 static int maxfds; 20 static struct { 15 21 eventer_t e; 16 22 pthread_t executor; 17 23 noit_spinlock_t lock; 18 } **master_fds; 19 20 int kqueue_fd; 24 } *master_fds = NULL; 25 26 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t; 27 28 static ev_lock_state_t 29 acquire_master_fd(int fd) { 30 if(noit_spinlock_trylock(&master_fds[fd].lock)) { 31 master_fds[fd].executor = pthread_self(); 32 return EV_OWNED; 33 } 34 if(pthread_equal(master_fds[fd].executor, pthread_self())) { 35 return EV_ALREADY_OWNED; 36 } 37 noit_spinlock_lock(&master_fds[fd].lock); 38 master_fds[fd].executor = pthread_self(); 39 return EV_OWNED; 40 } 41 static void 42 release_master_fd(int fd, ev_lock_state_t as) { 43 if(as == EV_OWNED) { 44 memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor)); 45 noit_spinlock_unlock(&master_fds[fd].lock); 46 } 47 } 48 49 static pthread_t master_thread; 50 static int kqueue_fd = -1; 21 51 typedef struct kqueue_setup { 22 52 struct kevent *__ke_vec; 23 53 unsigned int __ke_vec_a; 24 54 unsigned int __ke_vec_used; 25 } * kqs_t;55 } *kqs_t; 26 56 27 57 static pthread_mutex_t kqs_lock; 58 static pthread_mutex_t te_lock; 28 59 static kqs_t master_kqs = NULL; 29 60 static pthread_key_t kqueue_setup_key; 61 static noit_skiplist *timed_events = NULL; 30 62 #define KQUEUE_DECL kqs_t kqs 31 63 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key) … … 34 66 #define ke_vec_used kqs->__ke_vec_used 35 67 68 static void kqs_init(kqs_t kqs) { 69 enum { initial_alloc = 64 }; 70 ke_vec_a = initial_alloc; 71 ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent)); 72 } 36 73 static void 37 74 ke_change (register int const ident, … … 39 76 register int const flags, 40 77 register void *const udata) { 41 enum { initial_alloc = 64 };42 78 register struct kevent *kep; 43 79 KQUEUE_DECL; 44 45 80 KQUEUE_SETUP; 81 if(!kqs) kqs = master_kqs; 82 83 if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock); 46 84 if (!ke_vec_a) { 47 ke_vec_a = initial_alloc; 48 ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent)); 85 kqs_init(kqs); 49 86 } 50 87 else if (ke_vec_used == ke_vec_a) { … … 56 93 57 94 EV_SET(kep, ident, filter, flags, 0, 0, udata); 95 if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock); 58 96 } 59 97 60 98 static int eventer_kqueue_impl_init() { 61 99 struct rlimit rlim; 100 master_thread = pthread_self(); 62 101 kqueue_fd = kqueue(); 63 102 if(kqueue_fd == -1) { … … 65 104 } 66 105 pthread_mutex_init(&kqs_lock, NULL); 106 pthread_mutex_init(&te_lock, NULL); 107 pthread_key_create(&kqueue_setup_key, NULL); 67 108 master_kqs = calloc(1, sizeof(*master_kqs)); 109 kqs_init(master_kqs); 68 110 getrlimit(RLIMIT_NOFILE, &rlim); 69 111 maxfds = rlim.rlim_cur; 70 112 master_fds = calloc(maxfds, sizeof(*master_fds)); 113 timed_events = calloc(1, sizeof(*timed_events)); 114 noit_skiplist_init(timed_events); 115 noit_skiplist_set_compare(timed_events, 116 eventer_timecompare, eventer_timecompare); 117 noit_skiplist_add_index(timed_events, 118 noit_compare_voidptr, noit_compare_voidptr); 71 119 return 0; 72 120 } … … 75 123 } 76 124 static void eventer_kqueue_impl_add(eventer_t e) { 125 assert(e->mask); 126 ev_lock_state_t lockstate; 127 128 /* Timed events are simple */ 129 if(e->mask == EVENTER_TIMER) { 130 pthread_mutex_lock(&te_lock); 131 noit_skiplist_insert(timed_events, e); 132 pthread_mutex_unlock(&te_lock); 133 return; 134 } 135 136 /* file descriptor event */ 137 lockstate = acquire_master_fd(e->fd); 138 master_fds[e->fd].e = e; 139 if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 140 ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e); 141 if(e->mask & (EVENTER_WRITE)) 142 ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e); 143 release_master_fd(e->fd, lockstate); 77 144 } 78 145 static void eventer_kqueue_impl_remove(eventer_t e) { 146 if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) { 147 ev_lock_state_t lockstate; 148 lockstate = acquire_master_fd(e->fd); 149 if(e == master_fds[e->fd].e) { 150 master_fds[e->fd].e = NULL; 151 if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 152 ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 153 if(e->mask & (EVENTER_WRITE)) 154 ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 155 } 156 release_master_fd(e->fd, lockstate); 157 } 158 else if(e->mask & EVENTER_TIMER) { 159 pthread_mutex_lock(&te_lock); 160 noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 161 pthread_mutex_unlock(&te_lock); 162 } 163 else { 164 abort(); 165 } 79 166 } 80 167 static void eventer_kqueue_impl_update(eventer_t e) { 168 if(e->mask & EVENTER_TIMER) { 169 pthread_mutex_lock(&te_lock); 170 noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 171 noit_skiplist_insert(timed_events, e); 172 pthread_mutex_unlock(&te_lock); 173 return; 174 } 175 ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 176 ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 177 if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 178 ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 179 if(e->mask & (EVENTER_WRITE)) 180 ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 81 181 } 82 182 static eventer_t eventer_kqueue_impl_remove_fd(int fd) { 183 eventer_t eiq = NULL; 184 ev_lock_state_t lockstate; 185 if(master_fds[fd].e) { 186 lockstate = acquire_master_fd(fd); 187 eiq = master_fds[fd].e; 188 master_fds[fd].e = NULL; 189 if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 190 ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq); 191 if(eiq->mask & (EVENTER_WRITE)) 192 ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq); 193 release_master_fd(fd, lockstate); 194 } 195 return eiq; 83 196 } 84 197 static void eventer_kqueue_impl_loop() { 198 int is_master_thread = 0; 199 pthread_t self; 200 KQUEUE_DECL; 201 KQUEUE_SETUP; 202 203 self = pthread_self(); 204 if(pthread_equal(self, master_thread)) is_master_thread = 1; 205 206 if(!kqs) { 207 kqs = calloc(1, sizeof(*kqs)); 208 kqs_init(kqs); 209 } 210 pthread_setspecific(kqueue_setup_key, kqs); 211 while(1) { 212 struct timeval __now, __sleeptime; 213 struct timespec __kqueue_sleeptime; 214 int fd_cnt = 0; 215 int max_timed_events_to_process; 216 int newmask; 217 218 __sleeptime = __max_sleeptime; 219 220 /* Handle timed events... 221 * we could be multithreaded, so if we pop forever we could starve 222 * ourselves. */ 223 max_timed_events_to_process = timed_events->size; 224 while(max_timed_events_to_process-- > 0) { 225 eventer_t timed_event; 226 227 gettimeofday(&__now, NULL); 228 229 pthread_mutex_lock(&te_lock); 230 /* Peek at our next timed event, if should fire, pop it. 231 * otherwise we noop and NULL it out to break the loop. */ 232 timed_event = noit_skiplist_peek(timed_events); 233 if(timed_event) { 234 if(compare_timeval(timed_event->whence, __now) < 0) { 235 timed_event = noit_skiplist_pop(timed_events, NULL); 236 } 237 else { 238 sub_timeval(timed_event->whence, __now, &__sleeptime); 239 timed_event = NULL; 240 } 241 } 242 pthread_mutex_unlock(&te_lock); 243 if(timed_event == NULL) break; 244 245 /* Make our call */ 246 newmask = timed_event->callback(timed_event, EVENTER_TIMER, 247 timed_event->closure, &__now); 248 if(newmask) 249 eventer_add(timed_event); 250 else 251 eventer_free(timed_event); 252 } 253 254 if(compare_timeval(__max_sleeptime, __sleeptime) < 0) { 255 /* we exceed our configured maximum, set it down */ 256 memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime)); 257 } 258 259 /* If we're the master, we need to lock the master_kqs and make mods */ 260 if(master_kqs->__ke_vec_used) { 261 struct timespec __zerotime = { 0, 0 }; 262 pthread_mutex_lock(&kqs_lock); 263 fd_cnt = kevent(kqueue_fd, 264 master_kqs->__ke_vec, master_kqs->__ke_vec_used, 265 NULL, 0, 266 &__zerotime); 267 if(fd_cnt < 0) { 268 fprintf(stderr, "kevent: %s\n", strerror(errno)); 269 } 270 master_kqs->__ke_vec_used = 0; 271 pthread_mutex_unlock(&kqs_lock); 272 } 273 274 /* Now we move on to our fd-based events */ 275 __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec; 276 __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000; 277 fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used, 278 ke_vec, ke_vec_a, 279 &__kqueue_sleeptime); 280 ke_vec_used = 0; 281 if(fd_cnt < 0) { 282 fprintf(stderr, "kevent: %s\n", strerror(errno)); 283 } 284 else { 285 int idx; 286 for(idx = 0; idx < fd_cnt; idx++) { 287 ev_lock_state_t lockstate; 288 struct kevent *ke; 289 eventer_t e; 290 int fd, evmask, oldmask; 291 292 ke = &ke_vec[idx]; 293 e = (eventer_t)ke->udata; 294 fd = ke->ident; 295 assert(e == master_fds[fd].e); 296 lockstate = acquire_master_fd(fd); 297 assert(lockstate == EV_OWNED); 298 299 evmask = 0; 300 if(ke->filter == EVFILT_READ) evmask = EVENTER_READ; 301 if(ke->filter == EVFILT_WRITE) evmask = EVENTER_WRITE; 302 gettimeofday(&__now, NULL); 303 oldmask = e->mask; 304 newmask = e->callback(e, evmask, e->closure, &__now); 305 306 if(newmask) { 307 /* toggle the read bits if needed */ 308 if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) { 309 if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))) 310 ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e); 311 } 312 else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)) 313 ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 314 315 /* toggle the write bits if needed */ 316 if(newmask & EVENTER_WRITE) { 317 if(!(oldmask & EVENTER_WRITE)) 318 ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e); 319 } 320 else if(oldmask & EVENTER_WRITE) 321 ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 322 323 /* Set our mask */ 324 e->mask = newmask; 325 } 326 else { 327 eventer_free(e); 328 } 329 release_master_fd(fd, lockstate); 330 } 331 } 332 } 85 333 } 86 334 src/noit_defines.h
r01751d3 rb62cf2b 6 6 #define API_EXPORT(type) extern type 7 7 8 static inline int compare_timeval(struct timeval a, struct timeval b) { 9 if (a.tv_sec < b.tv_sec) return -1; 10 if (a.tv_sec > b.tv_sec) return 1; 11 if (a.tv_usec < b.tv_usec) return -1; 12 if (a.tv_usec > b.tv_usec) return 1; 13 return 0; 14 } 15 16 static inline void sub_timeval(struct timeval a, struct timeval b, 17 struct timeval *out) 18 { 19 out->tv_usec = a.tv_usec - b.tv_usec; 20 if (out->tv_usec < 0L) { 21 a.tv_sec--; 22 out->tv_usec += 1000000L; 23 } 24 out->tv_sec = a.tv_sec - b.tv_sec; 25 if (out->tv_sec < 0L) { 26 out->tv_sec++; 27 out->tv_usec -= 1000000L; 28 } 29 } 30 8 31 #endif src/noitd.c
rcd1ab55 rb62cf2b 5 5 #include <stdlib.h> 6 6 7 int stdin_handler(eventer_t e, int mask, void *closure, struct timeval *now) { 8 fprintf(stderr, "in stdin_handler:\n"); 9 return EVENTER_READ; 10 } 11 void stdin_sample() { 12 eventer_t e; 13 e = eventer_alloc(); 14 e->fd = 0; 15 e->mask = EVENTER_READ; 16 e->callback = stdin_handler; 17 eventer_add(e); 18 } 7 19 int main(int argc, char **argv) { 8 20 if(eventer_choose("kqueue") == -1) { … … 14 26 exit(-1); 15 27 } 28 29 stdin_sample(); 30 eventer_loop(); 16 31 return 0; 17 32 } src/utils/Makefile.in
r01751d3 rb62cf2b 10 10 top_srcdir=@top_srcdir@ 11 11 12 OBJS=noit_hash.o 12 OBJS=noit_hash.o noit_skiplist.o 13 13 14 14 all: libnoit_utils.a
