root/trunk/Sniffer-Postgres/lib/Sniffer/Postgres.pm

Revision 92, 13.4 kB (checked in by jesus, 4 years ago)

Move this into pgtreats

  • Property svn:executable set to *
Line 
1 package Sniffer::Postgres;
2 use strict;
3 use Sniffer::Connection::Postgres;
4 use base 'Class::Accessor::Faster';
5 use Data::Dumper;
6 use NetPacket::Ethernet;
7 use NetPacket::IP;
8 use NetPacket::TCP;
9 use Net::Pcap; # just for the convenience function below
10 use Net::Pcap::FindDevice;
11 use Carp qw(croak);
12
13 use vars qw($VERSION);
14 my $save;
15
16 $VERSION = '0.0.1';
17
18 =head1 NAME
19
20 Sniffer::Postgres - multi-connection sniffer driver
21
22 =head1 SYNOPSIS
23
24   use Sniffer::Postgres;
25   my $VERBOSE = 0;
26
27   my $sniffer = Sniffer::Postgres->new(
28     callbacks => {
29       response => sub { my ($res,$pg) = @_; },
30       log      => sub { print $_[0] if $VERBOSE },
31       tcp_log  => sub { print $_[0] if $VERBOSE > 1 },
32     },
33     timeout => 5*60, # seconds after which a connection is considered stale
34     stale_connection
35       => sub { my ($s,$conn,$key);
36                $s->log->("Connection $key is stale.");
37                $s->remove_connection($key);
38              },
39   );
40
41   $sniffer->run(); # uses the "best" default device
42
43   # Or, if you want to feed it the packets yourself:
44
45   while (1) {
46
47     # retrieve ethernet packet into $eth,
48     # for example via Net::Pcap
49     my $eth = sniff_ethernet_packet;
50
51     # And handle the packet. Callbacks will be invoked as soon
52     # as complete data is available
53     $sniffer->handle_eth_packet($eth);
54   }
55
56 This driver gives you callbacks with the completely accumulated
57 L<Postgres::Request>s or L<Postgres::Response>s as sniffed from the
58 TCP traffic. You need to feed it the Ethernet, IP or TCP packets
59 either from a dump file or from L<Net::Pcap> by unpacking them via
60 L<NetPacket>.
61
62 As the whole response data is accumulated in memory you should
63 be aware of memory issues. If you want to write stuff
64 directly to disk, you will need to submit patches to L<Sniffer::Connection::Postgres>.
65
66 =head1 METHODS
67
68 =head2 C<< new %ARGS >>
69
70 Creates a new object for handling many Postgres requests.
71 You can pass in the following arguments:
72
73   connections      - preexisting connections (optional)
74   callbacks        - callbacks for the new connections (hash reference)
75   timeout          - timeout in seconds after which a connection is considered stale
76   stale_connection - callback for stale connections
77   inflight         - a destination port to for which in-flight TCP sessions.
78
79 =cut
80
81 __PACKAGE__->mk_accessors(qw(inflight connections callbacks timeout pcap_device stale_connection last_sweep));
82
83 sub new {
84   my ($class,%args) = @_;
85
86   $args{connections} ||= {};
87   $args{callbacks}   ||= {};
88   $args{callbacks}->{log}   ||= sub {};
89   $args{stale_connection} ||= sub {
90     my ($s,$conn,$key) = @_;
91     $conn->log->("$key is stale.");
92     $s->remove_connection($key);
93   };
94
95   $args{timeout} = 300
96     unless exists $args{timeout};
97
98   my $self = $class->SUPER::new(\%args);
99
100   my $user_closed = delete $args{callbacks}->{closed};
101   $args{callbacks}->{closed} = sub {
102     my $key = $_[0]->flow;
103     if (! exists $args{connections}->{$key}) {
104       warn "Error: flow() ne connection-key!";
105       $key = join ":", reverse split /:/, $key;
106     }
107     $_[0]->{log}->("Removing $key");
108     $self->remove_connection($key);
109     goto &$user_closed
110       if $user_closed;
111   };
112   $self->last_sweep(0);
113   $self;
114 }
115
116 =head2 C<< $sniffer->remove_connection KEY >>
117
118 Removes a connection (or a key) from the list
119 of connections. This will not have the intended
120 effect if the connection is still alive, as it
121 will be recreated as soon as the next packet
122 for it is received.
123
124 =cut
125
126 sub remove_connection {
127   my ($self,$key) = @_;
128   if (ref $key) {
129     my $real_key = $key->flow;
130     if (! exists $self->connections->{$real_key}) {
131       warn "Error: flow() ne connection-key!";
132       $real_key = join ":", reverse split /:/, $real_key;
133     }
134     $key = $real_key;
135   }
136   delete $self->connections->{$key};
137 }
138
139 =head2 C<< $sniffer->find_or_create_connection TCP, %ARGS >>
140
141 This parses a TCP packet and creates the TCP connection
142 to keep track of the packet order and resent packets.
143
144 =cut
145
146 sub find_or_create_connection {
147   my ($self,$tcp) = @_;
148
149   my $connections = $self->connections;
150
151   # Implement find_or_create() for connections in
152   # the base class ...
153   my $key = $tcp->{src_port} . ":" . $tcp->{dest_port};
154   if (! exists $connections->{$key}) {
155     my $key2 = $tcp->{dest_port} . ":" . $tcp->{src_port};
156     if (! exists $connections->{$key2}) {
157       $self->callbacks->{log}->("Creating connection $key, sequence start at " . $tcp->{seqnum});
158       my $c = $self->callbacks;
159       my $o = Sniffer::Connection::Postgres->new(
160         %$c,
161         tcp            => $tcp,
162       );
163       $connections->{$key} = $o;
164     } else {
165       $key = $key2
166     }
167   }
168
169   return $connections->{$key};
170 }
171
172 =head2 C<< $sniffer->stale_connections( TIMEOUT, TIMESTAMP, HANDLER ) >>
173
174 Will call the handler HANDLER for all connections that
175 have a C<last_activity> before TIMESTAMP - TIMEOUT.
176
177 All parameters are optional and default to:
178
179   TIMEOUT   - $sniffer->timeout
180   TIMESTAMP - time()
181   HANDLER   - $sniffer->stale_connection
182
183 It returns all stale connections.
184
185 =cut
186
187 sub stale_connections {
188   my ($self,$timeout,$timestamp,$handler) = @_;
189   $timeout   ||= $self->timeout;
190   $handler   ||= $self->stale_connection;
191   $timestamp ||= time();
192
193   my $cutoff = $timestamp - $timeout;
194
195   my $connections = $self->connections;
196   my @stale = ();
197
198   if($self->last_sweep < $cutoff) {
199     @stale = grep { $connections->{$_}->last_activity < $cutoff } keys %$connections;
200     for my $connection (@stale) {
201       $handler->($self, $connections->{$connection}, $connection);
202     }
203     $self->last_sweep($timestamp);
204   }
205
206   map {$connections->{$_}} @stale;
207 }
208
209 =head2 C<< $sniffer->live_connections TIMEOUT, TIMESTAMP >>
210
211 Returns all live connections. No callback
212 mechanism is provided here.
213
214 The defaults are
215   TIMEOUT   - $sniffer->timeout
216   TIMESTAMP - time()
217
218 =cut
219
220 sub live_connections {
221   my ($self,$timeout,$timestamp) = @_;
222   $timeout   ||= $self->timeout;
223   $timestamp ||= time();
224
225   my $cutoff = $timestamp - $timeout;
226
227   my $connections = $self->connections;
228   grep { $_->last_activity >= $cutoff } values %$connections;
229 }
230
231 =head2 C<< $sniffer->handle_eth_packet ETH [, TIMESTAMP] >>
232
233 Processes a raw ethernet packet. L<Net::PCap> will return
234 this kind of packet for most Ethernet network cards.
235
236 You need to call this method (or one of the other protocol
237 methods) for every packet you wish to handle.
238
239 The optional TIMESTAMP corresponds to the epoch time
240 the packet was captured at. It defaults to the value
241 of C<time()>.
242
243 =cut
244
245 sub handle_eth_packet {
246   my ($self,$eth,$ts) = @_;
247   $ts ||= time();
248   #warn Dumper( NetPacket::Ethernet->decode($eth) );
249   $self->handle_ip_packet(NetPacket::Ethernet->decode($eth)->{data}, $ts);
250 }
251
252 =head2 C<< $sniffer->handle_ip_packet IP [, TIMESTAMP] >>
253
254 Processes a raw ip packet.
255
256 The optional TIMESTAMP corresponds to the epoch time
257 the packet was captured at. It defaults to the value
258 of C<time()>.
259
260 =cut
261
262 sub handle_ip_packet {
263   my ($self,$ip,$ts) = @_;
264   $ts ||= time();
265   #warn Dumper( NetPacket::IP->decode($ip) );
266   # This is a workaround around a bug in NetPacket::IP v0.04, which sets the
267   # payload to include the trailer
268   my $i = NetPacket::IP->decode($ip);
269
270   # Safeguard against malformed IP headers
271   $i->{hlen} = 5
272       if $i->{hlen} < 5;
273   my $conn = $self->handle_tcp_packet(substr($i->{data}, 0, $i->{len}-($i->{hlen}*4)), $ts);
274   unless($conn->tcp_connection->dest_host) {
275     $conn->tcp_connection->dest_host($i->{dest_ip});
276     $conn->tcp_connection->src_host($i->{src_ip});
277   }
278   $conn;
279 }
280
281 =head2 C<< $sniffer->handle_tcp_packet TCP [, TIMESTAMP] >>
282
283 Processes a raw tcp packet. This processes the packet
284 by handing it off to the L<Sniffer::Connection> which handles
285 the reordering of TCP packets.
286
287 It returns the L<Sniffer::Connection::Postgres> object that
288 handled the packet.
289
290 The optional TIMESTAMP corresponds to the epoch time
291 the packet was captured at. It defaults to the value
292 of C<time()>.
293
294 =cut
295
296 sub handle_tcp_packet {
297   my ($self,$tcp,$ts) = @_;
298   $ts ||= time();
299   if (! ref $tcp) {
300     $tcp = NetPacket::TCP->decode($tcp);
301   }
302   my $conn = $self->find_or_create_connection($tcp);
303
304   # This hacks mid-stream sessions
305   if(not defined $conn->tcp_connection->status) {
306     if(defined $self->inflight && $tcp->{dest_port} == $self->inflight) {
307       $conn->tcp_connection->init_from_packet($tcp);
308       $conn->tcp_connection->status('ACK');
309     }
310   }
311
312   $conn->handle_packet($tcp,$ts);
313   # Handle callbacks for detection of stale connections
314   $self->stale_connections(undef,$ts);
315
316   # Return the connection that the packet belongs to
317   $conn;
318 }
319
320 =head2 C<< run DEVICE, PCAP_FILTER, %OPTIONS >>
321
322 Listens on the given device for all TCP
323 traffic from and to port 80 and invokes the callbacks
324 as necessary. If you want finer control
325 over what C<Net::Pcap> does, you need to set up
326 Net::Pcap yourself.
327
328 The C<DEVICE> parameter is used to determine
329 the device via C<find_device> from L<Net::Pcap::FindDevice>.
330
331 The C<%OPTIONS> can be the following options:
332
333   capture_file - filename to which the whole capture stream is
334                  written, in L<Net::Pcap> format. This is mostly
335                  useful for remote debugging of a problematic
336                  sequence of connections.
337
338 =cut
339
340 sub run {
341   my ($self,$device_name,$pcap_filter,%options) = @_;
342
343   my $device = find_device($device_name);
344   $pcap_filter ||= "tcp port 5432";
345
346   my $err;
347   my ($address, $netmask);
348   if (Net::Pcap::lookupnet($device, \$address, \$netmask, \$err)) {
349     die 'Unable to look up device information for ', $device, ' - ', $err;
350   }
351   warn $err if $err;
352
353   #   Create packet capture object on device
354   my $pcap = Net::Pcap::open_live($device, 128000, -1, 500, \$err);
355   unless (defined $pcap) {
356     die "Unable to create packet capture on device '$device' - $err";
357   }
358
359   $self->pcap_device($pcap);
360
361   my $filter;
362   Net::Pcap::compile(
363     $pcap,
364     \$filter,
365     $pcap_filter,
366     0,
367     $netmask
368   ) && die 'Unable to compile packet capture filter';
369   Net::Pcap::setfilter($pcap,$filter);
370
371   if ($options{capture_file}) {
372     $save = Net::Pcap::dump_open($pcap,$options{capture_file});
373     END {
374       # Emergency cleanup
375       if ($save) {
376         Net::Pcap::dump_flush($save);
377         Net::Pcap::dump_close($save);
378         undef $save;
379       }
380     }
381   }
382
383   Net::Pcap::loop($pcap, -1, sub {
384     if ($save) {
385       Net::Pcap::dump($save, @_[1,2]);
386     }
387     $self->handle_eth_packet($_[2], $_[1]->{tv_sec} + $_[1]->{tv_usec} / 1000000.0 );
388   }, '')
389     || die 'Unable to perform packet capture';
390
391   if ($save) {
392     Net::Pcap::dump_flush($save);
393     Net::Pcap::dump_close($save);
394     undef $save;
395   }
396 }
397
398 =head2 C<< run_file FILENAME, PCAP_FILTER >>
399
400 "Listens" to the packets dumped into
401 a file. This is convenient to use if you
402 have packet captures from a remote machine
403 or want to test new protocol sniffers.
404
405 The file is presumed to contain an ethernet
406 stream of packets.
407
408 =cut
409
410 sub run_file {
411   my ($self, $filename, $pcap_filter) = @_;
412
413   $pcap_filter ||= "tcp port 5432";
414
415   my $err;
416
417   my $pcap = Net::Pcap::open_offline($filename, \$err);
418   unless (defined $pcap) {
419     croak "Unable to create packet capture from filename '$filename': $err";
420   }
421   $self->pcap_device($pcap);
422
423   my $filter;
424   Net::Pcap::compile(
425     $pcap,
426     \$filter,
427     $pcap_filter,
428     0,
429     0,
430   ) && die 'Unable to compile packet capture filter';
431   Net::Pcap::setfilter($pcap,$filter);
432
433   #Net::Pcap::loop($pcap, -1, sub { $self->handle_eth_packet($_[2]) }, '');
434   Net::Pcap::loop($pcap, -1, sub { $self->handle_eth_packet($_[2], $_[1]->{tv_sec} + $_[1]->{tv_usec} / 1000000.0) }, '')
435 }
436
437 1;
438
439 =head1 CALLBACKS
440
441 =head2 C<< request REQ, CONN >>
442
443 The C<request> callback is called with the parsed request and the connection
444 object. The request will be an instance of [cpan://Postgres::Request] and will
445 have an absolute URI if possible. Currently, the hostname for the absolute URI
446 is constructed from the C<Host:> header.
447
448 =head2 C<< response RES, REQ, CONN >>
449
450 The C<response> callback is called with the parsed response, the request
451 and the connection object.
452
453 =head2 C<< log MESSAGE >>
454
455 The C<log> callback is called whenever the connection makes progress
456 and in other various situations.
457
458 =head2 C<< tcp_log MESSAGE >>
459
460 The C<tcp_log> callback is passed on to the underlying C<Sniffer::Connection>
461 object and can be used to monitor the TCP connection.
462
463 =head2 C<< stale_connection SNIFFER, CONN >>
464
465 Is called whenever a connection goes over the C<timeout> limit
466 without any activity. The default handler weeds out stale
467 connections with the following code:
468
469   sub {
470     my ($self,$conn,$key);
471     $self->log->("Connection $key is stale.");
472     delete $self->connections->{ $key }
473   }
474
475 =head1 EXAMPLE PCAP FILTERS
476
477 Here are some example Net::Pcap filters for common things:
478
479 Capture all Postgres traffic between your machine and C<db.example.com>:
480
481      (dest db.example.com && (tcp port 5432))
482   || (src  db.example.com && (tcp port 5432))
483
484 Capture all Postgres traffic between your machine
485 and C<db1.example.com> or C<db2.example.com>:
486
487     (dest db1.example.com && (tcp port 5432))
488   ||(src db1.example.com  && (tcp port 5432))
489   ||(dest db2.example.com && (tcp port 5432))
490   ||(src db2.example.com  && (tcp port 5432))
491
492 Note that Net::Pcap resolves the IP addresses before using them, so you might
493 actually get more data than you asked for.
494
495 =head1 BUGS
496
497 =head1 AUTHOR
498
499 Theo Schlossnagle (jesus@omniti.com)
500
501 =head1 COPYRIGHT
502
503 Copyright (C) 2005 Max Maischein.  All Rights Reserved.
504 Copyright (C) 2010 OmniTI Computer Consulting, Inc.  All Rights Reserved.
505
506 This code is free software; you can redistribute it and/or modify it
507 under the same terms as Perl itself.
508
509 =cut
Note: See TracBrowser for help on using the browser.