root/trunk/omnipitr/lib/OmniPITR/Program/Backup/Master.pm

Revision 143, 20.7 kB (checked in by depesz, 8 years ago)

developer docs

Line 
1 package OmniPITR::Program::Backup::Master;
2 use strict;
3 use warnings;
4
5 use base qw( OmniPITR::Program );
6
7 use Carp;
8 use OmniPITR::Tools qw( :all );
9 use English qw( -no_match_vars );
10 use File::Basename;
11 use Sys::Hostname;
12 use POSIX qw( strftime );
13 use File::Spec;
14 use File::Path qw( mkpath rmtree );
15 use File::Copy;
16 use Storable;
17 use Cwd;
18 use Getopt::Long qw( :config no_ignore_case );
19
20 =head1 run()
21
22 Main function wrapping all work.
23
24 Starts with getting list of compressions that have to be done, then it chooses where to compress to (important if we have remote-only destination), then it makes actual backup, and delivers to all
25 destinations.
26
27 =cut
28
29 sub run {
30     my $self = shift;
31     $self->get_list_of_all_necessary_compressions();
32     $self->choose_base_local_destinations();
33
34     $self->start_pg_backup();
35     $self->compress_pgdata();
36
37     $self->stop_pg_backup();
38     $self->compress_xlogs();
39
40     $self->deliver_to_all_destinations();
41
42     $self->log->log( 'All done%s.', $self->{ 'had_errors' } ? ' with errors' : '' );
43     exit( 1 ) if $self->{ 'had_errors' };
44
45     return;
46 }
47
48 =head1 deliver_to_all_destinations()
49
50 Simple wrapper to have single point to call to deliver backups to all requested backups.
51
52 =cut
53
54 sub deliver_to_all_destinations {
55     my $self = shift;
56
57     $self->deliver_to_all_local_destinations();
58
59     $self->deliver_to_all_remote_destinations();
60
61     return;
62 }
63
64 =head1 deliver_to_all_local_destinations()
65
66 Copies backups to all local destinations which are not also base destinations for their respective compressions.
67
68 =cut
69
70 sub deliver_to_all_local_destinations {
71     my $self = shift;
72     return unless $self->{ 'destination' }->{ 'local' };
73     for my $dst ( @{ $self->{ 'destination' }->{ 'local' } } ) {
74         next if $dst->{ 'path' } eq $self->{ 'base' }->{ $dst->{ 'compression' } };
75
76         my $B = $self->{ 'base' }->{ $dst->{ 'compression' } };
77
78         for my $type ( qw( data xlog ) ) {
79
80             my $filename = $self->get_archive_filename( $type, $dst->{ 'compression' } );
81             my $source_filename = File::Spec->catfile( $B, $filename );
82             my $destination_filename = File::Spec->catfile( $dst->{ 'path' }, $filename );
83
84             my $time_msg = sprintf 'Copying %s to %s', $source_filename, $destination_filename;
85             $self->log->time_start( $time_msg ) if $self->verbose;
86
87             my $rc = copy( $source_filename, $destination_filename );
88
89             $self->log->time_finish( $time_msg ) if $self->verbose;
90
91             unless ( $rc ) {
92                 $self->log->error( 'Cannot copy %s to %s : %s', $source_filename, $destination_filename, $OS_ERROR );
93                 $self->{ 'had_errors' } = 1;
94             }
95
96         }
97     }
98     return;
99 }
100
101 =head1 deliver_to_all_remote_destinations()
102
103 Delivers backups to remote destinations using rsync program.
104
105 =cut
106
107 sub deliver_to_all_remote_destinations {
108     my $self = shift;
109     return unless $self->{ 'destination' }->{ 'remote' };
110     for my $dst ( @{ $self->{ 'destination' }->{ 'remote' } } ) {
111
112         my $B = $self->{ 'base' }->{ $dst->{ 'compression' } };
113
114         for my $type ( qw( data xlog ) ) {
115
116             my $filename = $self->get_archive_filename( $type, $dst->{ 'compression' } );
117             my $source_filename = File::Spec->catfile( $B, $filename );
118             my $destination_filename = $dst->{ 'path' };
119             $destination_filename =~ s{/*\z}{/};
120             $destination_filename .= $filename;
121
122             my $time_msg = sprintf 'Copying %s to %s', $source_filename, $destination_filename;
123             $self->log->time_start( $time_msg ) if $self->verbose;
124
125             my $response = run_command( $self->{ 'temp-dir' }, $self->{ 'rsync-path' }, $source_filename, $destination_filename );
126
127             $self->log->time_finish( $time_msg ) if $self->verbose;
128
129             if ( $response->{ 'error_code' } ) {
130                 $self->log->error( 'Cannot send archive %s to %s: %s', $source_filename, $destination_filename, $response );
131                 $self->{ 'had_errors' } = 1;
132             }
133         }
134     }
135     return;
136 }
137
138 =head1 compress_xlogs()
139
140 Wrapper function which encapsulates all work required to compress xlog segments that accumulated during backup of data directory.
141
142 =cut
143
144 sub compress_xlogs {
145     my $self = shift;
146     $self->log->time_start( 'Compressing xlogs' ) if $self->verbose;
147     $self->start_writers( 'xlog' );
148
149     $self->tar_and_compress(
150         'work_dir' => $self->{ 'xlogs' } . '.real',
151         'tar_dir'  => basename( $self->{ 'data-dir' } ),
152     );
153     $self->log->time_finish( 'Compressing xlogs' ) if $self->verbose;
154
155     return;
156 }
157
158 =head1 compress_pgdata()
159
160 Wrapper function which encapsulates all work required to compress data directory.
161
162 =cut
163
164 sub compress_pgdata {
165     my $self = shift;
166     $self->log->time_start( 'Compressing $PGDATA' ) if $self->verbose;
167     $self->start_writers( 'data' );
168
169     $self->tar_and_compress(
170         'work_dir' => dirname( $self->{ 'data-dir' } ),
171         'tar_dir'  => basename( $self->{ 'data-dir' } ),
172         'excludes' => [ qw( pg_log/* pg_xlog/0* pg_xlog/archive_status/* postmaster.pid ) ],
173     );
174
175     $self->log->time_finish( 'Compressing $PGDATA' ) if $self->verbose;
176     return;
177 }
178
179 =head1 tar_and_compress()
180
181 Worker function which does all of the actual tar, and sending data to compression filehandles.
182
183 Takes hash (not hashref) as argument, and uses following keys from it:
184
185 =over
186
187 =item * tar_dir - which directory to compress
188
189 =item * work_dir - what should be current working directory when executing tar
190
191 =item * excludes - optional key, that (if exists) is treated as arrayref of shell globs (tar dir) of items to exclude from backup
192
193 =back
194
195 If tar will print anything to STDERR it will be logged. Error status code is ignored, as it is expected that tar will generate errors (due to files modified while archiving).
196
197 =cut
198
199 sub tar_and_compress {
200     my $self = shift;
201     my %ARGS = @_;
202
203     my @compression_command = ( $self->{ 'nice-path' }, $self->{ 'tar-path' }, 'cf', '-' );
204     if ( $ARGS{ 'excludes' } ) {
205         push @compression_command, map { sprintf '--exclude=%s/%s', $ARGS{ 'tar_dir' }, $_ } @{ $ARGS{ 'excludes' } };
206     }
207     push @compression_command, $ARGS{ 'tar_dir' };
208
209     my $compression_str = join ' ', map { quotemeta $_ } @compression_command;
210
211     $self->prepare_temp_directory();
212     my $tar_stderr_filename = File::Spec->catfile( $self->{ 'temp-dir' }, 'tar.stderr' );
213     $compression_str .= ' 2> ' . quotemeta( $tar_stderr_filename );
214
215     my $previous_dir = getcwd;
216     chdir $ARGS{ 'work_dir' } if $ARGS{ 'work_dir' };
217
218     my $tar;
219     unless ( open $tar, '-|', $compression_str ) {
220         $self->clean_and_die( 'Cannot start tar (%s) : %s', $compression_str, $OS_ERROR );
221     }
222
223     chdir $previous_dir if $ARGS{ 'work_dir' };
224
225     my $buffer;
226     while ( my $len = sysread( $tar, $buffer, 8192 ) ) {
227         while ( my ( $type, $fh ) = each %{ $self->{ 'writers' } } ) {
228             my $written = syswrite( $fh, $buffer, $len );
229             next if $written == $len;
230             $self->clean_and_die( "Writting %u bytes to filehandle for <%s> compression wrote only %u bytes ?!", $len, $written );
231         }
232     }
233     close $tar;
234
235     for my $fh ( values %{ $self->{ 'writers' } } ) {
236         close $fh;
237     }
238
239     delete $self->{ 'writers' };
240
241     my $stderr_output;
242     my $stderr;
243     unless ( open $stderr, '<', $tar_stderr_filename ) {
244         $self->log->log( 'Cannot open tar stderr file (%s) for reading: %s', $tar_stderr_filename );
245         return;
246     }
247     {
248         local $/;
249         $stderr_output = <$stderr>;
250     };
251     close $stderr;
252     return unless $stderr_output;
253     $self->log->log( 'Tar (%s) generated these output on stderr:', $compression_str );
254     $self->log->log( '==============================================' );
255     $self->log->log( '%s', $stderr_output );
256     $self->log->log( '==============================================' );
257     unlink $tar_stderr_filename;
258     return;
259 }
260
261 =head1 start_writers()
262
263 Starts set of filehandles, which write to file, or to compression program, to create final archives.
264
265 Each compression schema gets its own filehandle, and printing data to it, will pass it to file directly or through compression program that has been chosen based on command line arguments.
266
267 =cut
268
269 sub start_writers {
270     my $self      = shift;
271     my $data_type = shift;
272
273     my %writers = ();
274
275     COMPRESSION:
276     while ( my ( $type, $dst_path ) = each %{ $self->{ 'base' } } ) {
277         my $filename = $self->get_archive_filename( $data_type, $type );
278
279         my $full_file_path = File::Spec->catfile( $dst_path, $filename );
280
281         if ( $type eq 'none' ) {
282             if ( open my $fh, '>', $full_file_path ) {
283                 $writers{ $type } = $fh;
284                 $self->log->log( "Starting \"none\" writer to $full_file_path" ) if $self->verbose;
285                 next COMPRESSION;
286             }
287             $self->clean_and_die( 'Cannot write to %s : %s', $full_file_path, $OS_ERROR );
288         }
289
290         my @command = map { quotemeta $_ } ( $self->{ 'nice-path' }, $self->{ $type . '-path' }, '--stdout', '-' );
291         push @command, ( '>', quotemeta( $full_file_path ) );
292
293         $self->log->log( "Starting \"%s\" writer to %s", $type, $full_file_path ) if $self->verbose;
294         if ( open my $fh, '|-', join( ' ', @command ) ) {
295             $writers{ $type } = $fh;
296             next COMPRESSION;
297         }
298         $self->clean_and_die( 'Cannot open command. Error: %s, Command: %s', $OS_ERROR, \@command );
299     }
300     $self->{ 'writers' } = \%writers;
301     return;
302 }
303
304 =head1 get_archive_filename()
305
306 Helper function, which takes filetype and compression schema to use, and returns generated filename (based on filename-template command line option).
307
308 =cut
309
310 sub get_archive_filename {
311     my $self = shift;
312     my ( $type, $compression ) = @_;
313
314     my $ext = $compression eq 'none' ? '' : ext_for_compression( $compression );
315
316     my $filename = $self->{ 'filename-template' };
317     $filename =~ s/__FILETYPE__/$type/g;
318     $filename =~ s/__CEXT__/$ext/g;
319
320     return $filename;
321 }
322
323 =head1 stop_pg_backup()
324
325 Runs pg_stop_backup() PostgreSQL function, which is crucial in backup process.
326
327 This happens after data directory compression, but before compression of xlogs.
328
329 This function also removes temporary destination for xlogs (dst-backup for omnipitr-archive).
330
331 =cut
332
333 sub stop_pg_backup {
334     my $self = shift;
335
336     $self->prepare_temp_directory();
337
338     my @command = ( @{ $self->{ 'psql' } }, "SELECT pg_stop_backup()" );
339
340     $self->log->time_start( 'pg_stop_backup()' ) if $self->verbose;
341     my $status = run_command( $self->{ 'temp-dir' }, @command );
342     $self->log->time_finish( 'pg_stop_backup()' ) if $self->verbose;
343
344     $self->clean_and_die( 'Running pg_stop_backup() failed: %s', $status ) if $status->{ 'error_code' };
345
346     $status->{ 'stdout' } =~ s/\s*\z//;
347     $self->log->log( q{pg_stop_backup('omnipitr') returned %s.}, $status->{ 'stdout' } );
348
349     my $subdir = basename( $self->{ 'data-dir' } );
350
351     unlink( $self->{ 'xlogs' } );
352
353     return;
354 }
355
356 =head1 start_pg_backup()
357
358 Executes pg_start_backup() postgresql function, and (before it) creates temporary destination for xlogs (dst-backup for omnipitr-archive).
359
360 =cut
361
362 sub start_pg_backup {
363     my $self = shift;
364
365     my $subdir = basename( $self->{ 'data-dir' } );
366     $self->clean_and_die( 'Cannot create directory %s : %s', $self->{ 'xlogs' } . '.real',                 $OS_ERROR ) unless mkdir( $self->{ 'xlogs' } . '.real' );
367     $self->clean_and_die( 'Cannot create directory %s : %s', $self->{ 'xlogs' } . ".real/$subdir",         $OS_ERROR ) unless mkdir( $self->{ 'xlogs' } . ".real/$subdir" );
368     $self->clean_and_die( 'Cannot create directory %s : %s', $self->{ 'xlogs' } . ".real/$subdir/pg_xlog", $OS_ERROR ) unless mkdir( $self->{ 'xlogs' } . ".real/$subdir/pg_xlog" );
369     $self->clean_and_die( 'Cannot symlink %s to %s: %s', $self->{ 'xlogs' } . ".real/$subdir/pg_xlog", $self->{ 'xlogs' } . $OS_ERROR )
370         unless symlink( $self->{ 'xlogs' } . ".real/$subdir/pg_xlog", $self->{ 'xlogs' } );
371
372     $self->prepare_temp_directory();
373
374     my @command = ( @{ $self->{ 'psql' } }, "SELECT pg_start_backup('omnipitr')" );
375
376     $self->log->time_start( 'pg_start_backup()' ) if $self->verbose;
377     my $status = run_command( $self->{ 'temp-dir' }, @command );
378     $self->log->time_finish( 'pg_start_backup()' ) if $self->verbose;
379
380     $self->clean_and_die( 'Running pg_start_backup() failed: %s', $status ) if $status->{ 'error_code' };
381
382     $status->{ 'stdout' } =~ s/\s*\z//;
383     $self->log->log( q{pg_start_backup('omnipitr') returned %s.}, $status->{ 'stdout' } );
384
385     return;
386 }
387
388 =head1 clean_and_die()
389
390 Helper function called by other parts of code - removes temporary destination for xlogs, and exits program with logging passed message.
391
392 =cut
393
394 sub clean_and_die {
395     my $self          = shift;
396     my @msg_with_args = @_;
397     rmtree( $self->{ 'xlogs' } . '.real', $self->{ 'xlogs' } );
398     $self->log->fatal( @msg_with_args );
399 }
400
401 =head1 choose_base_local_destinations()
402
403 Chooses single local destination for every compression schema required by destinations specifications.
404
405 In case some compression schema exists only for remote destination, local temp directory is created in --temp-dir location.
406
407 =cut
408
409 sub choose_base_local_destinations {
410     my $self = shift;
411
412     my $base = { map { ( $_ => undef ) } @{ $self->{ 'compressions' } } };
413     $self->{ 'base' } = $base;
414
415     for my $dst ( @{ $self->{ 'destination' }->{ 'local' } } ) {
416         my $type = $dst->{ 'compression' };
417         next if defined $base->{ $type };
418         $base->{ $type } = $dst->{ 'path' };
419     }
420
421     my @unfilled = grep { !defined $base->{ $_ } } keys %{ $base };
422
423     return if 0 == scalar @unfilled;
424     $self->log->log( 'These compression(s) were given only for remote destinations. Usually this is not desired: %s', join( ', ', @unfilled ) );
425
426     $self->prepare_temp_directory();
427     for my $type ( @unfilled ) {
428         my $tmp_dir = File::Spec->catfile( $self->{ 'temp-dir' }, $type );
429         mkpath( $tmp_dir );
430         $base->{ $type } = $tmp_dir;
431     }
432
433     return;
434 }
435
436 =head1 DESTROY()
437
438 Destroctor for object - removes temp directory on program exit.
439
440 =cut
441
442 sub DESTROY {
443     my $self = shift;
444     return unless $self->{ 'temp-dir-prepared' };
445     rmtree( $self->{ 'temp-dir-prepared' } );
446     return;
447 }
448
449 =head1 prepare_temp_directory()
450
451 Helper function, which builds path for temp directory, and creates it.
452
453 Path is generated by using given temp-dir and 'omnipitr-backup-master' named.
454
455 For example, for temp-dir '/tmp' used temp directory would be /tmp/omnipitr-backup-master.
456
457 =cut
458
459 sub prepare_temp_directory {
460     my $self = shift;
461     return if $self->{ 'temp-dir-prepared' };
462     my $full_temp_dir = File::Spec->catfile( $self->{ 'temp-dir' }, basename( $PROGRAM_NAME ) );
463     mkpath( $full_temp_dir );
464     $self->{ 'temp-dir' }          = $full_temp_dir;
465     $self->{ 'temp-dir-prepared' } = $full_temp_dir;
466     return;
467 }
468
469 =head1 get_list_of_all_necessary_compressions()
470
471 Scans list of destinations, and gathers list of all compressions that have to be made.
472
473 This is to be able to compress file only once even when having multiple destinations that require compressed format.
474
475 =cut
476
477 sub get_list_of_all_necessary_compressions {
478     my $self = shift;
479
480     my %compression = ();
481
482     for my $dst_type ( qw( local remote ) ) {
483         next unless my $dsts = $self->{ 'destination' }->{ $dst_type };
484         for my $destination ( @{ $dsts } ) {
485             $compression{ $destination->{ 'compression' } } = 1;
486         }
487     }
488     $self->{ 'compressions' } = [ keys %compression ];
489     return;
490 }
491
492 =head1 read_args()
493
494 Function which does all the parsing, and transformation of command line arguments.
495
496 =cut
497
498 sub read_args {
499     my $self = shift;
500
501     my @argv_copy = @ARGV;
502
503     my %args = (
504         'temp-dir' => $ENV{ 'TMPDIR' } || '/tmp',
505         'gzip-path'         => 'gzip',
506         'bzip2-path'        => 'bzip2',
507         'lzma-path'         => 'lzma',
508         'tar-path'          => 'tar',
509         'nice-path'         => 'nice',
510         'psql-path'         => 'psql',
511         'rsync-path'        => 'rsync',
512         'database'          => 'postgres',
513         'filename-template' => '__HOSTNAME__-__FILETYPE__-^Y-^m-^d.tar__CEXT__',
514     );
515
516     croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-backup-master.pod' )
517         unless GetOptions(
518         \%args,
519         'data-dir|D=s',
520         'database|d=s',
521         'host|h=s',
522         'port|p=i',
523         'username|U=s',
524         'xlogs|x=s',
525         'dst-local|dl=s@',
526         'dst-remote|dr=s@',
527         'temp-dir|t=s',
528         'log|l=s',
529         'filename-template|f=s',
530         'pid-file',
531         'verbose|v',
532         'gzip-path|gp=s',
533         'bzip2-path|bp=s',
534         'lzma-path|lp=s',
535         'nice-path|np=s',
536         'psql-path|pp=s',
537         'tar-path|tp=s',
538         'rsync-path|rp=s',
539         );
540
541     croak( '--log was not provided - cannot continue.' ) unless $args{ 'log' };
542     for my $key ( qw( log filename-template ) ) {
543         $args{ $key } =~ tr/^/%/;
544     }
545
546     for my $key ( grep { !/^dst-(?:local|remote)$/ } keys %args ) {
547         $self->{ $key } = $args{ $key };
548     }
549
550     for my $type ( qw( local remote ) ) {
551         my $D = [];
552         $self->{ 'destination' }->{ $type } = $D;
553
554         next unless defined $args{ 'dst-' . $type };
555
556         my %temp_for_uniq = ();
557         my @items = grep { !$temp_for_uniq{ $_ }++ } @{ $args{ 'dst-' . $type } };
558
559         for my $item ( @items ) {
560             my $current = { 'compression' => 'none', };
561             if ( $item =~ s/\A(gzip|bzip2|lzma)=// ) {
562                 $current->{ 'compression' } = $1;
563             }
564             $current->{ 'path' } = $item;
565             push @{ $D }, $current;
566         }
567     }
568
569     $self->{ 'filename-template' } = strftime( $self->{ 'filename-template' }, localtime time() );
570     $self->{ 'filename-template' } =~ s/__HOSTNAME__/hostname()/ge;
571
572     # We do it here so it will actually work for reporing problems in validation
573     $self->{ 'log_template' } = $args{ 'log' };
574     $self->{ 'log' }          = OmniPITR::Log->new( $self->{ 'log_template' } );
575
576     $self->log->log( 'Called with parameters: %s', join( ' ', @argv_copy ) ) if $self->verbose;
577
578     my @psql = ();
579     push @psql, $self->{ 'psql-path' };
580     push @psql, '-qAtX';
581     push @psql, ( '-U', $self->{ 'username' } ) if $self->{ 'username' };
582     push @psql, ( '-d', $self->{ 'database' } ) if $self->{ 'database' };
583     push @psql, ( '-h', $self->{ 'host' } )     if $self->{ 'host' };
584     push @psql, ( '-p', $self->{ 'port' } )     if $self->{ 'port' };
585     push @psql, '-c';
586     $self->{ 'psql' } = \@psql;
587
588     return;
589 }
590
591 =head1 validate_args()
592
593 Does all necessary validation of given command line arguments.
594
595 One exception is for compression programs paths - technically, it could be validated in here, but benefit would be pretty limited, and code to do so relatively complex, as compression program path
596 might, but doesn't have to be actual file path - it might be just program name (without path), which is the default.
597
598 =cut
599
600 sub validate_args {
601     my $self = shift;
602
603     $self->log->fatal( 'Data-dir was not provided!' ) unless defined $self->{ 'data-dir' };
604     $self->log->fatal( 'Provided data-dir (%s) does not exist!',   $self->{ 'data-dir' } ) unless -e $self->{ 'data-dir' };
605     $self->log->fatal( 'Provided data-dir (%s) is not directory!', $self->{ 'data-dir' } ) unless -d $self->{ 'data-dir' };
606     $self->log->fatal( 'Provided data-dir (%s) is not readable!',  $self->{ 'data-dir' } ) unless -r $self->{ 'data-dir' };
607
608     my $dst_count = scalar( @{ $self->{ 'destination' }->{ 'local' } } ) + scalar( @{ $self->{ 'destination' }->{ 'remote' } } );
609     $self->log->fatal( "No --dst-* has been provided!" ) if 0 == $dst_count;
610
611     $self->log->fatal( "Filename template does not contain __FILETYPE__ placeholder!" ) unless $self->{ 'filename-template' } =~ /__FILETYPE__/;
612     $self->log->fatal( "Filename template cannot contain / or \\ characters!" ) if $self->{ 'filename-template' } =~ m{[/\\]};
613
614     $self->log->fatal( "Xlogs dir (--xlogs) was not given! Cannot work without it" ) unless defined $self->{ 'xlogs' };
615     $self->{ 'xlogs' } =~ s{/+$}{};
616     $self->log->fatal( "Xlogs dir (%s) already exists! It shouldn't.",           $self->{ 'xlogs' } ) if -e $self->{ 'xlogs' };
617     $self->log->fatal( "Xlogs side dir (%s.real) already exists! It shouldn't.", $self->{ 'xlogs' } ) if -e $self->{ 'xlogs' } . '.real';
618
619     my $xlog_parent = dirname( $self->{ 'xlogs' } );
620     $self->log->fatal( 'Xlogs dir (%s) parent (%s) does not exist. Cannot continue.',   $self->{ 'xlogs' }, $xlog_parent ) unless -e $xlog_parent;
621     $self->log->fatal( 'Xlogs dir (%s) parent (%s) is not directory. Cannot continue.', $self->{ 'xlogs' }, $xlog_parent ) unless -d $xlog_parent;
622     $self->log->fatal( 'Xlogs dir (%s) parent (%s) is not writable. Cannot continue.',  $self->{ 'xlogs' }, $xlog_parent ) unless -w $xlog_parent;
623
624     return;
625 }
626
627 1;
Note: See TracBrowser for help on using the browser.