Changeset 140

Show
Ignore:
Timestamp:
05/17/10 20:45:12 (4 years ago)
Author:
depesz
Message:

Alpha version. Works, but requires still docs and tests.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/omnipitr/lib/OmniPITR/Program/Backup/Master.pm

    r131 r140  
    1515use File::Copy; 
    1616use Storable; 
     17use Cwd; 
    1718use Getopt::Long qw( :config no_ignore_case ); 
    1819 
    1920sub run { 
    2021    my $self = shift; 
    21     $self->read_state(); 
    2222    $self->get_list_of_all_necessary_compressions(); 
    2323    $self->choose_base_local_destinations(); 
    2424 
    25     return; 
     25    $self->start_pg_backup(); 
     26    $self->compress_pgdata(); 
     27 
     28    $self->stop_pg_backup(); 
     29    $self->compress_xlogs(); 
     30 
     31    $self->deliver_to_all_destinations(); 
     32 
     33    $self->log->log( 'All done%s.', $self->{ 'had_errors' } ? ' with errors' : '' ); 
     34    exit( 1 ) if $self->{ 'had_errors' }; 
     35 
     36    return; 
     37
     38 
     39sub deliver_to_all_destinations { 
     40    my $self = shift; 
     41 
     42    # $self->log->log( '%s', $self ); 
     43 
     44    $self->deliver_to_all_local_destinations(); 
     45 
     46    $self->deliver_to_all_remote_destinations(); 
     47 
     48    return; 
     49
     50 
     51sub deliver_to_all_local_destinations { 
     52    my $self = shift; 
     53    return unless $self->{ 'destination' }->{ 'local' }; 
     54    for my $dst ( @{ $self->{ 'destination' }->{ 'local' } } ) { 
     55        next if $dst->{ 'path' } eq $self->{ 'base' }->{ $dst->{ 'compression' } }; 
     56 
     57        my $B = $self->{ 'base' }->{ $dst->{ 'compression' } }; 
     58 
     59        for my $type ( qw( data xlog ) ) { 
     60 
     61            my $filename = $self->get_archive_filename( $type, $dst->{ 'compression' } ); 
     62            my $source_filename = File::Spec->catfile( $B, $filename ); 
     63            my $destination_filename = File::Spec->catfile( $dst->{ 'path' }, $filename ); 
     64 
     65            my $time_msg = sprintf 'Copying %s to %s', $source_filename, $destination_filename; 
     66            $self->log->time_start( $time_msg ) if $self->verbose; 
     67 
     68            my $rc = copy( $source_filename, $destination_filename ); 
     69 
     70            $self->log->time_finish( $time_msg ) if $self->verbose; 
     71 
     72            unless ( $rc ) { 
     73                $self->log->error( 'Cannot copy %s to %s : %s', $source_filename, $destination_filename, $OS_ERROR ); 
     74                $self->{ 'had_errors' } = 1; 
     75            } 
     76 
     77        } 
     78    } 
     79    return; 
     80
     81 
     82sub deliver_to_all_remote_destinations { 
     83    my $self = shift; 
     84    return unless $self->{ 'destination' }->{ 'remote' }; 
     85    for my $dst ( @{ $self->{ 'destination' }->{ 'remote' } } ) { 
     86 
     87        my $B = $self->{ 'base' }->{ $dst->{ 'compression' } }; 
     88 
     89        for my $type ( qw( data xlog ) ) { 
     90 
     91            my $filename = $self->get_archive_filename( $type, $dst->{ 'compression' } ); 
     92            my $source_filename = File::Spec->catfile( $B, $filename ); 
     93            my $destination_filename = $dst->{ 'path' }; 
     94            $destination_filename =~ s{/*\z}{/}; 
     95            $destination_filename .= $filename; 
     96 
     97            my $time_msg = sprintf 'Copying %s to %s', $source_filename, $destination_filename; 
     98            $self->log->time_start( $time_msg ) if $self->verbose; 
     99 
     100            my $response = run_command( $self->{ 'temp-dir' }, $self->{ 'rsync-path' }, $source_filename, $destination_filename ); 
     101 
     102            $self->log->time_finish( $time_msg ) if $self->verbose; 
     103 
     104            if ( $response->{ 'error_code' } ) { 
     105                $self->log->error( 'Cannot send archive %s to %s: %s', $source_filename, $destination_filename, $response ); 
     106                $self->{ 'had_errors' } = 1; 
     107            } 
     108        } 
     109    } 
     110    return; 
     111
     112 
     113sub compress_xlogs { 
     114    my $self = shift; 
     115    $self->log->time_start( 'Compressing xlogs' ) if $self->verbose; 
     116    $self->start_writers( 'xlog' ); 
     117 
     118    $self->tar_and_compress( 
     119        'work_dir' => $self->{ 'xlogs' } . '.real', 
     120        'tar_dir'  => basename( $self->{ 'data-dir' } ), 
     121    ); 
     122    $self->log->time_finish( 'Compressing xlogs' ) if $self->verbose; 
     123 
     124    return; 
     125
     126 
     127sub compress_pgdata { 
     128    my $self = shift; 
     129    $self->log->time_start( 'Compressing $PGDATA' ) if $self->verbose; 
     130    $self->start_writers( 'data' ); 
     131 
     132    $self->tar_and_compress( 
     133        'work_dir' => dirname( $self->{ 'data-dir' } ), 
     134        'tar_dir'  => basename( $self->{ 'data-dir' } ), 
     135        'excludes' => [ qw( pg_log/* pg_xlog/0* pg_xlog/archive_status/* postmaster.pid ) ], 
     136    ); 
     137 
     138    $self->log->time_finish( 'Compressing $PGDATA' ) if $self->verbose; 
     139    return; 
     140
     141 
     142sub tar_and_compress { 
     143    my $self = shift; 
     144    my %ARGS = @_; 
     145 
     146    my @compression_command = ( $self->{ 'nice-path' }, $self->{ 'tar-path' }, 'cf', '-' ); 
     147    if ( $ARGS{ 'excludes' } ) { 
     148        push @compression_command, map { sprintf '--exclude=%s/%s', $ARGS{ 'tar_dir' }, $_ } @{ $ARGS{ 'excludes' } }; 
     149    } 
     150    push @compression_command, $ARGS{ 'tar_dir' }; 
     151 
     152    my $compression_str = join ' ', map { quotemeta $_ } @compression_command; 
     153 
     154    $self->prepare_temp_directory(); 
     155    my $tar_stderr_filename = File::Spec->catfile( $self->{ 'temp-dir' }, 'tar.stderr' ); 
     156    $compression_str .= ' 2> ' . quotemeta( $tar_stderr_filename ); 
     157 
     158    my $previous_dir = getcwd; 
     159    chdir $ARGS{ 'work_dir' } if $ARGS{ 'work_dir' }; 
     160 
     161    my $tar; 
     162    unless ( open $tar, '-|', $compression_str ) { 
     163        $self->clean_and_die( 'Cannot start tar (%s) : %s', $compression_str, $OS_ERROR ); 
     164    } 
     165 
     166    chdir $previous_dir if $ARGS{ 'work_dir' }; 
     167 
     168    my $buffer; 
     169    while ( my $len = sysread( $tar, $buffer, 8192 ) ) { 
     170        while ( my ( $type, $fh ) = each %{ $self->{ 'writers' } } ) { 
     171            my $written = syswrite( $fh, $buffer, $len ); 
     172            next if $written == $len; 
     173            $self->clean_and_die( "Writting %u bytes to filehandle for <%s> compression wrote only %u bytes ?!", $len, $written ); 
     174        } 
     175    } 
     176    close $tar; 
     177 
     178    for my $fh ( values %{ $self->{ 'writers' } } ) { 
     179        close $fh; 
     180    } 
     181 
     182    delete $self->{ 'writers' }; 
     183 
     184    my $stderr_output; 
     185    my $stderr; 
     186    unless ( open $stderr, '<', $tar_stderr_filename ) { 
     187        $self->log->log( 'Cannot open tar stderr file (%s) for reading: %s', $tar_stderr_filename ); 
     188        return; 
     189    } 
     190    { 
     191        local $/; 
     192        $stderr_output = <$stderr>; 
     193    }; 
     194    close $stderr; 
     195    return unless $stderr_output; 
     196    $self->log->log( 'Tar (%s) generated these output on stderr:', $compression_str ); 
     197    $self->log->log( '==============================================' ); 
     198    $self->log->log( '%s', $stderr_output ); 
     199    $self->log->log( '==============================================' ); 
     200    unlink $tar_stderr_filename; 
     201    return; 
     202
     203 
     204sub start_writers { 
     205    my $self      = shift; 
     206    my $data_type = shift; 
     207 
     208    my %writers = (); 
     209 
     210    COMPRESSION: 
     211    while ( my ( $type, $dst_path ) = each %{ $self->{ 'base' } } ) { 
     212        my $filename = $self->get_archive_filename( $data_type, $type ); 
     213 
     214        my $full_file_path = File::Spec->catfile( $dst_path, $filename ); 
     215 
     216        if ( $type eq 'none' ) { 
     217            if ( open my $fh, '>', $full_file_path ) { 
     218                $writers{ $type } = $fh; 
     219                $self->log->log( "Starting \"none\" writer to $full_file_path" ) if $self->verbose; 
     220                next COMPRESSION; 
     221            } 
     222            $self->clean_and_die( 'Cannot write to %s : %s', $full_file_path, $OS_ERROR ); 
     223        } 
     224 
     225        my @command = map { quotemeta $_ } ( $self->{ 'nice-path' }, $self->{ $type . '-path' }, '--stdout', '-' ); 
     226        push @command, ( '>', quotemeta( $full_file_path ) ); 
     227 
     228        $self->log->log( "Starting \"%s\" writer to %s", $type, $full_file_path ) if $self->verbose; 
     229        if ( open my $fh, '|-', join( ' ', @command ) ) { 
     230            $writers{ $type } = $fh; 
     231            next COMPRESSION; 
     232        } 
     233        $self->clean_and_die( 'Cannot open command. Error: %s, Command: %s', $OS_ERROR, \@command ); 
     234    } 
     235    $self->{ 'writers' } = \%writers; 
     236    return; 
     237
     238 
     239sub get_archive_filename { 
     240    my $self = shift; 
     241    my ( $type, $compression ) = @_; 
     242 
     243    my $ext = $compression eq 'none' ? '' : ext_for_compression( $compression ); 
     244 
     245    my $filename = $self->{ 'filename-template' }; 
     246    $filename =~ s/__FILETYPE__/$type/g; 
     247    $filename =~ s/__CEXT__/$ext/g; 
     248 
     249    return $filename; 
     250
     251 
     252sub stop_pg_backup { 
     253    my $self = shift; 
     254 
     255    $self->prepare_temp_directory(); 
     256 
     257    my @command = ( @{ $self->{ 'psql' } }, "SELECT pg_stop_backup()" ); 
     258 
     259    $self->log->time_start( 'pg_stop_backup()' ) if $self->verbose; 
     260    my $status = run_command( $self->{ 'temp-dir' }, @command ); 
     261    $self->log->time_finish( 'pg_stop_backup()' ) if $self->verbose; 
     262 
     263    $self->clean_and_die( 'Running pg_stop_backup() failed: %s', $status ) if $status->{ 'error_code' }; 
     264 
     265    $status->{ 'stdout' } =~ s/\s*\z//; 
     266    $self->log->log( q{pg_stop_backup('omnipitr') returned %s.}, $status->{ 'stdout' } ); 
     267 
     268    my $subdir = basename( $self->{ 'data-dir' } ); 
     269 
     270    unlink( $self->{ 'xlogs' } ); 
     271 
     272    return; 
     273
     274 
     275sub start_pg_backup { 
     276    my $self = shift; 
     277 
     278    my $subdir = basename( $self->{ 'data-dir' } ); 
     279    $self->clean_and_die( 'Cannot create directory %s : %s', $self->{ 'xlogs' } . '.real',                 $OS_ERROR ) unless mkdir( $self->{ 'xlogs' } . '.real' ); 
     280    $self->clean_and_die( 'Cannot create directory %s : %s', $self->{ 'xlogs' } . ".real/$subdir",         $OS_ERROR ) unless mkdir( $self->{ 'xlogs' } . ".real/$subdir" ); 
     281    $self->clean_and_die( 'Cannot create directory %s : %s', $self->{ 'xlogs' } . ".real/$subdir/pg_xlog", $OS_ERROR ) unless mkdir( $self->{ 'xlogs' } . ".real/$subdir/pg_xlog" ); 
     282    $self->clean_and_die( 'Cannot symlink %s to %s: %s', $self->{ 'xlogs' } . ".real/$subdir/pg_xlog", $self->{ 'xlogs' } . $OS_ERROR ) 
     283        unless symlink( $self->{ 'xlogs' } . ".real/$subdir/pg_xlog", $self->{ 'xlogs' } ); 
     284 
     285    $self->prepare_temp_directory(); 
     286 
     287    my @command = ( @{ $self->{ 'psql' } }, "SELECT pg_start_backup('omnipitr')" ); 
     288 
     289    $self->log->time_start( 'pg_start_backup()' ) if $self->verbose; 
     290    my $status = run_command( $self->{ 'temp-dir' }, @command ); 
     291    $self->log->time_finish( 'pg_start_backup()' ) if $self->verbose; 
     292 
     293    $self->clean_and_die( 'Running pg_start_backup() failed: %s', $status ) if $status->{ 'error_code' }; 
     294 
     295    $status->{ 'stdout' } =~ s/\s*\z//; 
     296    $self->log->log( q{pg_start_backup('omnipitr') returned %s.}, $status->{ 'stdout' } ); 
     297 
     298    return; 
     299
     300 
     301sub clean_and_die { 
     302    my $self          = shift; 
     303    my @msg_with_args = @_; 
     304    rmtree( $self->{ 'xlogs' } . '.real', $self->{ 'xlogs' } ); 
     305    $self->log->fatal( @msg_with_args ); 
    26306} 
    27307 
     
    49329        $base->{ $type } = $tmp_dir; 
    50330    } 
     331 
    51332    return; 
    52333} 
     
    71352sub prepare_temp_directory { 
    72353    my $self = shift; 
     354    return if $self->{ 'temp-dir-prepared' }; 
    73355    my $full_temp_dir = File::Spec->catfile( $self->{ 'temp-dir' }, basename( $PROGRAM_NAME ) ); 
    74356    mkpath( $full_temp_dir ); 
     
    78360} 
    79361 
    80 =head1 read_state() 
    81  
    82 Helper function to read state from state file. 
    83  
    84 Name of state file is the same as filename of WAL segment being archived, but it is in state-dir. 
    85  
    86 =cut 
    87  
    88 sub read_state { 
    89     my $self = shift; 
    90     $self->{ 'state' } = {}; 
    91  
    92     return unless $self->{ 'state-dir' }; 
    93  
    94     $self->{ 'state-file' } = File::Spec->catfile( $self->{ 'state-dir' }, 'omnipitr-backup-master.state' ); 
    95     return unless -f $self->{ 'state-file' }; 
    96     $self->{ 'state' } = retrieve( $self->{ 'state-file' } ); 
    97     return; 
    98 } 
    99  
    100 =head1 save_state() 
    101  
    102 Helper function to save state to state-file. 
    103  
    104 =cut 
    105  
    106 sub save_state { 
    107     my $self = shift; 
    108  
    109     return unless $self->{ 'state-file' }; 
    110  
    111     store( $self->{ 'state' }, $self->{ 'state-file' } ); 
    112  
    113     return; 
    114 } 
    115  
    116362=head1 get_list_of_all_necessary_compressions() 
    117363 
     
    151397 
    152398    my %args = ( 
    153         'data-dir'          => '.', 
    154         'temp-dir'          => $ENV{ 'TMPDIR' } || '/tmp', 
     399        'temp-dir' => $ENV{ 'TMPDIR' } || '/tmp', 
    155400        'gzip-path'         => 'gzip', 
    156401        'bzip2-path'        => 'bzip2', 
     
    164409    ); 
    165410 
    166     croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-master-backup.pod' ) 
     411    croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-backup-master.pod' ) 
    167412        unless GetOptions( 
    168413        \%args, 
     
    179424        'filename-template|f=s', 
    180425        'pid-file', 
    181         'verbose|v=s', 
     426        'verbose|v', 
    182427        'gzip-path|gp=s', 
    183428        'bzip2-path|bp=s', 
     
    187432        'tar-path|tp=s', 
    188433        'rsync-path|rp=s', 
    189         'state-dir|s=s', 
    190434        ); 
    191435 
     
    227471    $self->log->log( 'Called with parameters: %s', join( ' ', @argv_copy ) ) if $self->verbose; 
    228472 
     473    my @psql = (); 
     474    push @psql, $self->{ 'psql-path' }; 
     475    push @psql, '-qAtX'; 
     476    push @psql, ( '-U', $self->{ 'username' } ) if $self->{ 'username' }; 
     477    push @psql, ( '-d', $self->{ 'database' } ) if $self->{ 'database' }; 
     478    push @psql, ( '-h', $self->{ 'host' } )     if $self->{ 'host' }; 
     479    push @psql, ( '-p', $self->{ 'port' } )     if $self->{ 'port' }; 
     480    push @psql, '-c'; 
     481    $self->{ 'psql' } = \@psql; 
     482 
    229483    return; 
    230484} 
     
    242496    my $self = shift; 
    243497 
     498    $self->log->fatal( 'Data-dir was not provided!' ) unless defined $self->{ 'data-dir' }; 
     499    $self->log->fatal( 'Provided data-dir (%s) does not exist!',   $self->{ 'data-dir' } ) unless -e $self->{ 'data-dir' }; 
     500    $self->log->fatal( 'Provided data-dir (%s) is not directory!', $self->{ 'data-dir' } ) unless -d $self->{ 'data-dir' }; 
     501    $self->log->fatal( 'Provided data-dir (%s) is not readable!',  $self->{ 'data-dir' } ) unless -r $self->{ 'data-dir' }; 
     502 
    244503    my $dst_count = scalar( @{ $self->{ 'destination' }->{ 'local' } } ) + scalar( @{ $self->{ 'destination' }->{ 'remote' } } ); 
    245504    $self->log->fatal( "No --dst-* has been provided!" ) if 0 == $dst_count; 
    246505 
    247     if ( 1 < $dst_count ) { 
    248         $self->log->fatal( "More than 1 --dst-* has been provided, but no --state-dir!" ) if !$self->{ 'state-dir' }; 
    249         $self->log->fatal( "Given --state-dir (%s) does not exist",     $self->{ 'state-dir' } ) unless -e $self->{ 'state-dir' }; 
    250         $self->log->fatal( "Given --state-dir (%s) is not a directory", $self->{ 'state-dir' } ) unless -d $self->{ 'state-dir' }; 
    251         $self->log->fatal( "Given --state-dir (%s) is not writable",    $self->{ 'state-dir' } ) unless -w $self->{ 'state-dir' }; 
    252     } 
    253  
    254506    $self->log->fatal( "Filename template does not contain __FILETYPE__ placeholder!" ) unless $self->{ 'filename-template' } =~ /__FILETYPE__/; 
    255507    $self->log->fatal( "Filename template cannot contain / or \\ characters!" ) if $self->{ 'filename-template' } =~ m{[/\\]}; 
    256508 
     509    $self->log->fatal( "Xlogs dir (--xlogs) was not given! Cannot work without it" ) unless defined $self->{ 'xlogs' }; 
     510    $self->{ 'xlogs' } =~ s{/+$}{}; 
     511    $self->log->fatal( "Xlogs dir (%s) already exists! It shouldn't.",           $self->{ 'xlogs' } ) if -e $self->{ 'xlogs' }; 
     512    $self->log->fatal( "Xlogs side dir (%s.real) already exists! It shouldn't.", $self->{ 'xlogs' } ) if -e $self->{ 'xlogs' } . '.real'; 
     513 
     514    my $xlog_parent = dirname( $self->{ 'xlogs' } ); 
     515    $self->log->fatal( 'Xlogs dir (%s) parent (%s) does not exist. Cannot continue.',   $self->{ 'xlogs' }, $xlog_parent ) unless -e $xlog_parent; 
     516    $self->log->fatal( 'Xlogs dir (%s) parent (%s) is not directory. Cannot continue.', $self->{ 'xlogs' }, $xlog_parent ) unless -d $xlog_parent; 
     517    $self->log->fatal( 'Xlogs dir (%s) parent (%s) is not writable. Cannot continue.',  $self->{ 'xlogs' }, $xlog_parent ) unless -w $xlog_parent; 
     518 
    257519    return; 
    258520}