Changeset 87 for trunk

Show
Ignore:
Timestamp:
03/04/10 21:52:37 (4 years ago)
Author:
depesz
Message:

omnipitr-archive is working

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/omnipitr/doc/omnipitr-archive.pod

    r82 r87  
    5555 
    5656Log verbosely what is happening. 
     57 
     58=item --gzip-path (-gp) 
     59 
     60Full path to gzip program - in case you can't set proper PATH environment 
     61variable. 
     62 
     63=item --bzip2-path (-bp) 
     64 
     65Full path to bzip2 program - in case you can't set proper PATH environment 
     66variable. 
     67 
     68=item --lzma-path (-lp) 
     69 
     70Full path to lzma program - in case you can't set proper PATH environment 
     71variable. 
    5772 
    5873=back 
     
    166181possible. 
    167182 
     183All programs are passed I<--stdout> option to compress without modifying source file. 
     184 
    168185If you want to pass any extra arguments to compression program, you can either: 
    169186 
  • trunk/omnipitr/lib/OmniPITR/Log.pm

    r71 r87  
    66use File::Basename; 
    77use File::Path; 
     8use Data::Dumper; 
    89use POSIX qw(strftime floor); 
    910use IO::File; 
     
    1415 
    1516sub new { 
    16     my $class                 = shift; 
     17    my $class = shift; 
    1718    my ( $filename_template ) = @_; 
    1819    croak( 'Logfile name template was not provided!' ) unless $filename_template; 
    1920 
    20     my $self                  = bless {}, $class; 
     21    my $self = bless {}, $class; 
    2122 
    2223    $self->{ 'template' }       = $filename_template; 
     
    3435    my $log_line_prefix = $self->get_log_line_prefix(); 
    3536    my $fh              = $self->get_log_fh(); 
     37 
     38    @args = map { ref $_ ? Dumper( $_ ) : $_ } @args; 
    3639 
    3740    my $message = sprintf $format, @args; 
     
    6164    my $self = shift; 
    6265    $self->_log( 'FATAL', @_ ); 
    63     exit(1); 
     66    exit( 1 ); 
     67
     68 
     69sub time_start { 
     70    my $self    = shift; 
     71    my $comment = shift; 
     72    $self->{ 'timers' }->{ $comment } = time(); 
     73    return; 
     74
     75 
     76sub time_finish { 
     77    my $self    = shift; 
     78    my $comment = shift; 
     79    my $start   = delete $self->{ 'timers' }->{ $comment }; 
     80    $self->log( 'Timer [%s] took: %.3fs', $comment, time() - ( $start || 0 ) ); 
     81    return; 
    6482} 
    6583 
  • trunk/omnipitr/lib/OmniPITR/Program/Archive.pm

    r86 r87  
    22use strict; 
    33use warnings; 
     4 
    45use base qw( OmniPITR::Program ); 
     6 
    57use Carp; 
     8use OmniPITR::Tools qw( :all ); 
    69use English qw( -no_match_vars ); 
    710use File::Basename; 
    811use File::Spec; 
    9 use File::Path qw( make_path ); 
     12use File::Path qw( make_path remove_tree ); 
    1013use File::Copy; 
    1114use Storable; 
    1215use Getopt::Long; 
    13 use Data::Dumper; 
    14 use Digest::MD5; 
     16 
     17=head1 run() 
     18 
     19Main function, called by actual script in bin/, wraps all work done by script with the sole exception of reading and validating command line arguments. 
     20 
     21These tasks (reading and validating arguments) are in this module, but they are called from L<OmniPITR::Program::new()> 
     22 
     23Name of called method should be self explanatory, and if you need further information - simply check doc for the method you have questions about. 
     24 
     25=cut 
    1526 
    1627sub run { 
     
    1829    $self->read_state(); 
    1930    $self->prepare_temp_directory(); 
    20     $self->copy_segment_to_temp_dir(); 
    21 
    22  
    23 sub copy_segment_to_temp_dir { 
    24     my $self = shift; 
    25     return if $self->segment_already_copied(); 
    26     my $new_file = $self->get_temp_filename_for( 'none' ); 
    27     unless ( copy( $self->{'segment'}, $new_file ) ) { 
    28         $self->log->fatal('Cannot copy %s to %s : %s', $self->{'segment'}, $new_file, $OS_ERROR ); 
    29     } 
    30     my $has_md5 = $self->md5sum( $new_file ); 
    31     $self->{'state'}->{'compressed'}->{'none'} = $has_md5; 
    32     $self->save_state(); 
    33     return; 
    34 
    35  
    36 sub segment_already_copied { 
    37     my $self = shift; 
    38     return unless $self->{ 'state' }->{ 'compressed' }->{ 'none' }; 
    39     my $want_md5 = $self->{ 'state' }->{ 'compressed' }->{ 'none' }; 
    40  
    41     my $temp_file_name = $self->get_temp_filename_for( 'none' ); 
     31    $self->make_all_necessary_compressions(); 
     32    $self->send_to_destinations(); 
     33    $self->cleanup(); 
     34    $self->log->log( 'Segment %s successfully sent to all destinations.', $self->{ 'segment' } ); 
     35    return; 
     36
     37 
     38=head1 send_to_destinations() 
     39 
     40Does all the actual sending of segments to local and remote destinations. 
     41 
     42It keeps it's state to be able to continue in case of error. 
     43 
     44Since both local and remote destinations are handled in the same way, there is no point in duplicating the code to 2 methods. 
     45 
     46Important notice - this function has to have the ability to choose whether to use temp file (for compressed destinations), or original segment (for uncompressed ones). This is done by this line: 
     47 
     48    my $local_file = $dst->{ 'compression' } eq 'none' ? $self->{ 'segment' } : $self->get_temp_filename_for( $dst->{ 'compression' } ); 
     49 
     50=cut 
     51 
     52sub send_to_destinations { 
     53    my $self = shift; 
     54 
     55    for my $destination_type ( qw( local remote ) ) { 
     56        next unless my $dst_list = $self->{ 'destination' }->{ $destination_type }; 
     57        for my $dst ( @{ $dst_list } ) { 
     58            next if $self->segment_already_sent( $destination_type, $dst ); 
     59 
     60            my $local_file = $dst->{ 'compression' } eq 'none' ? $self->{ 'segment' } : $self->get_temp_filename_for( $dst->{ 'compression' } ); 
     61 
     62            my $destination_file_path = $dst->{ 'path' }; 
     63            $destination_file_path =~ s{/*\z}{}; 
     64            $destination_file_path .= '/' . basename( $local_file ); 
     65 
     66            my $comment = 'Sending ' . $local_file . ' to ' . $destination_file_path; 
     67 
     68            $self->log->time_start( $comment ) if $self->{ 'verbose' }; 
     69            my $response = run_command( $self->{ 'temp-dir' }, 'rsync', $local_file, $destination_file_path ); 
     70            $self->log->time_finish( $comment ) if $self->{ 'verbose' }; 
     71 
     72            if ( $response->{ 'error_code' } ) { 
     73                $self->log->fatal( "Cannot send segment %s to %s : %s", $local_file, $destination_file_path, $response ); 
     74            } 
     75            $self->{ 'state' }->{ 'sent' }->{ $destination_type }->{ $dst->{ 'path' } } = 1; 
     76            $self->save_state(); 
     77        } 
     78    } 
     79    return; 
     80
     81 
     82=head1 segment_already_sent() 
     83 
     84Simple function, that checks if segment has been already sent to given destination, and if yes - logs such information. 
     85 
     86=cut 
     87 
     88sub segment_already_sent { 
     89    my $self = shift; 
     90    my ( $type, $dst ) = @_; 
     91    return unless $self->{ 'state' }->{ 'sent' }->{ $type }; 
     92    return unless $self->{ 'state' }->{ 'sent' }->{ $type }->{ $dst->{ 'path' } }; 
     93    $self->log->log( 'Segment already sent to %s. Skipping.', $dst->{ 'path' } ); 
     94    return 1; 
     95
     96 
     97=head1 cleanup() 
     98 
     99Function is called only if segment has been successfully compressed and sent to all destinations. 
     100 
     101It basically removes tempdir with compressed copies of segment, and state file for given segment. 
     102 
     103=cut 
     104 
     105sub cleanup { 
     106    my $self = shift; 
     107    remove_tree( $self->{ 'temp-dir' } ); 
     108    unlink $self->{ 'state-file' }; 
     109    return; 
     110
     111 
     112=head1 get_list_of_all_necessary_compressions() 
     113 
     114Scans list of destinations, and gathers list of all compressions that have to be made. 
     115 
     116This is to be able to compress file only once even when having multiple destinations that require compressed format. 
     117 
     118=cut 
     119 
     120sub get_list_of_all_necessary_compressions { 
     121    my $self = shift; 
     122 
     123    my %compression = (); 
     124 
     125    for my $dst_type ( qw( local remote ) ) { 
     126        next unless my $dsts = $self->{ 'destination' }->{ $dst_type }; 
     127        for my $destination ( @{ $dsts } ) { 
     128            $compression{ $destination->{ 'compression' } } = 1; 
     129        } 
     130    } 
     131    $self->{ 'compressions' } = [ grep { $_ ne 'none' } keys %compression ]; 
     132    return; 
     133
     134 
     135=head1 make_all_necessary_compressions() 
     136 
     137Wraps all work required to compress segment to all necessary formats. 
     138 
     139Call to actuall compressor has to be done via "bash -c" to be able to easily use run_command() function which has side benefits of getting stdout, stderr, and proper fetching error codes. 
     140 
     141Overhead of additional fork+exec for bash should be negligible. 
     142 
     143=cut 
     144 
     145sub make_all_necessary_compressions { 
     146    my $self = shift; 
     147    $self->get_list_of_all_necessary_compressions(); 
     148 
     149    for my $compression ( @{ $self->{ 'compressions' } } ) { 
     150        next if $self->segment_already_compressed( $compression ); 
     151        my $compressed_filename = $self->get_temp_filename_for( $compression ); 
     152 
     153        my $compressor_binary = $self->{ $compression . '-path' } || $compression; 
     154 
     155        my $compression_command = sprintf 'nice %s --stdout %s > %s', $compressor_binary, quotemeta( $self->{ 'segment' } ), quotemeta( $compressed_filename ); 
     156 
     157        $self->log->time_start( 'Compressing with ' . $compression ) if $self->{ 'verbose' }; 
     158        my $response = run_command( $self->{ 'temp-dir' }, 'bash', '-c', $compression_command ); 
     159        $self->log->time_finish( 'Compressing with ' . $compression ) if $self->{ 'verbose' }; 
     160 
     161        if ( $response->{ 'error_code' } ) { 
     162            $self->log->fatal( 'Error while compressing with %s : %s', $compression, $response ); 
     163        } 
     164 
     165        $self->{ 'state' }->{ 'compressed' }->{ $compression } = file_md5sum( $compressed_filename ); 
     166        $self->save_state(); 
     167    } 
     168    return; 
     169
     170 
     171=head1 segment_already_compressed() 
     172 
     173Helper function which checks if segment has been already compressed. 
     174 
     175It uses state file, and checks compressed file md5sum to be sure that the file wasn't damaged between prior run and now. 
     176 
     177=cut 
     178 
     179sub segment_already_compressed { 
     180    my $self = shift; 
     181    my $type = shift; 
     182    return unless $self->{ 'state' }->{ 'compressed' }->{ $type }; 
     183    my $want_md5 = $self->{ 'state' }->{ 'compressed' }->{ $type }; 
     184 
     185    my $temp_file_name = $self->get_temp_filename_for( $type ); 
    42186    return unless -e $temp_file_name; 
    43187 
    44     my $has_md5 = $self->md5sum( $temp_file_name ); 
     188    my $has_md5 = file_md5sum( $temp_file_name ); 
    45189    if ( $has_md5 eq $want_md5 ) { 
    46         $self->log->log('Segment has been already copied to temp location.'); 
     190        $self->log->log( 'Segment has been already compressed with %s.', $type ); 
    47191        return 1; 
    48192    } 
    49193 
    50194    unlink $temp_file_name; 
    51     $self->log->error( 'Segment already copied, but with bad MD5 ?!' ); 
    52  
    53     return; 
    54 
     195    $self->log->error( 'Segment already compressed to %s, but with bad MD5 (file: %s, state: %s), recompressing.', $type, $has_md5, $want_md5 ); 
     196 
     197    return; 
     198
     199 
     200=head1 get_temp_filename_for() 
     201 
     202Helper function to build full (with path) filename for compressed segment, assuming given compression. 
     203 
     204=cut 
    55205 
    56206sub get_temp_filename_for { 
     
    58208    my $type = shift; 
    59209 
    60     return File::Spec->catfile( $self->{ 'temp-dir' }, $type ); 
    61 
    62  
    63 sub md5sum { 
    64     my $self     = shift; 
    65     my $filename = shift; 
    66  
    67     my $ctx = Digest::MD5->new; 
    68  
    69     open my $fh, '<', $filename or $self->log->fatal( 'Cannot open file for md5summing %s : %s', $filename, $OS_ERROR ); 
    70     $ctx->addfile( $fh ); 
    71     my $md5 = $ctx->hexdigest(); 
    72     close $fh; 
    73  
    74     return $md5; 
    75 
     210    return File::Spec->catfile( $self->{ 'temp-dir' }, basename( $self->{ 'segment' } ) . ext_for_compression( $type ) ); 
     211
     212 
     213=head1 prepare_temp_directory() 
     214 
     215Helper function, which builds path for temp directory, and creates it. 
     216 
     217Path is generated by using given temp-dir, 'omnipitr-archive' name, and filename of segment. 
     218 
     219For example, for temp-dir '/tmp' and segment being pg_xlog/000000010000000000000003, actual, used temp directory would be /tmp/omnipitr-archive/000000010000000000000003/. 
     220 
     221=cut 
    76222 
    77223sub prepare_temp_directory { 
     
    83229} 
    84230 
     231=head1 read_state() 
     232 
     233Helper function to read state from state file. 
     234 
     235Name of state file is the same as filename of WAL segment being archived, but it is in state-dir. 
     236 
     237=cut 
     238 
    85239sub read_state { 
    86240    my $self = shift; 
     
    95249} 
    96250 
     251=head1 save_state() 
     252 
     253Helper function to save state to state-file. 
     254 
     255=cut 
     256 
    97257sub save_state { 
    98258    my $self = shift; 
     
    105265} 
    106266 
     267=head1 read_args() 
     268 
     269Function which does all the parsing, and transformation of command line arguments. 
     270 
     271It also verified base facts about passed WAL segment name, but all other validations, are being done in separate function: L<validate_args()>. 
     272 
     273=cut 
     274 
    107275sub read_args { 
    108276    my $self = shift; 
     
    111279 
    112280    my %args = ( 
    113         'data-dir' => '.', 
    114         'temp-dir' => $ENV{ 'TMPDIR' } || '/tmp', 
     281        'data-dir'   => '.', 
     282        'temp-dir'   => $ENV{ 'TMPDIR' } || '/tmp', 
     283        'gzip-path'  => 'gzip', 
     284        'bzip2-path' => 'bzip2', 
     285        'lzma-path'  => 'lzma', 
    115286    ); 
    116287 
     
    118289        unless GetOptions( 
    119290        \%args, 
     291        'bzip2-path|bp=s', 
    120292        'data-dir|D=s', 
    121293        'dst-local|dl=s@', 
    122294        'dst-remote|dr=s@', 
     295        'gzip-path|gp=s', 
     296        'log|l=s', 
     297        'lzma-path|lp=s', 
     298        'pid-file=s', 
     299        'state-dir|s=s', 
    123300        'temp-dir|t=s', 
    124         'log|l=s', 
    125         'state-dir|s=s', 
    126         'pid-file=s', 
    127         'verbose|v' 
     301        'verbose|v', 
    128302        ); 
    129303 
    130304    croak( '--log was not provided - cannot continue.' ) unless $args{ 'log' }; 
    131305 
    132     for my $key ( qw( data-dir temp-dir state-dir pid-file verbose ) ) { 
     306    for my $key ( qw( data-dir temp-dir state-dir pid-file verbose gzip-path bzip2-path lzma-path ) ) { 
    133307        $self->{ $key } = $args{ $key }; 
    134308    } 
     
    168342} 
    169343 
     344=head1 validate_args() 
     345 
     346Does all necessary validation of given command line arguments. 
     347 
     348One exception is for compression programs paths - technically, it could be validated in here, but benefit would be pretty limited, and code to do so relatively complex, as compression program path 
     349might, but doesn't have to be actual file path - it might be just program name (without path), which is the default. 
     350 
     351=cut 
     352 
    170353sub validate_args { 
    171354    my $self = shift;