root/trunk/omnipitr/lib/OmniPITR/Program/Backup/Slave.pm

Revision 175, 17.4 kB (checked in by depesz, 4 years ago)

Reftoring complete, omnipitr-backup-slave in beta

Line 
1 package OmniPITR::Program::Backup::Slave;
2 use strict;
3 use warnings;
4
5 use base qw( OmniPITR::Program::Backup );
6
7 use File::Spec;
8 use File::Basename;
9 use English qw( -no_match_vars );
10 use File::Copy;
11 use File::Path;
12 use Getopt::Long;
13 use Carp;
14 use POSIX qw( strftime );
15 use Sys::Hostname;
16 use OmniPITR::Tools qw( run_command ext_for_compression );
17
18 =head1 make_data_archive()
19
20 Wraps all work necessary to make local .tar files (optionally compressed)
21 with content of PGDATA
22
23 =cut
24
25 sub make_data_archive {
26     my $self = shift;
27     $self->pause_xlog_removal();
28     $self->{ 'CONTROL' }->{ 'initial' } = $self->get_control_data();
29     $self->compress_pgdata();
30     return;
31 }
32
33 =head1 make_xlog_archive()
34
35 Wraps all work necessary to make local .tar files (optionally compressed)
36 with xlogs required to start PostgreSQL from backup.
37
38 =cut
39
40 sub make_xlog_archive {
41     my $self = shift;
42     $self->wait_for_checkpoint_location_change();
43     $self->compress_xlogs();
44     $self->unpause_xlog_removal();
45     return;
46 }
47
48 =head1 compress_xlogs()
49
50 Wrapper function which encapsulates all work required to compress xlog
51 segments that accumulated during backup of data directory.
52
53 =cut
54
55 sub compress_xlogs {
56     my $self = shift;
57
58     $self->make_dot_backup_file();
59     $self->uncompress_wal_archive_segments();
60
61     $self->log->time_start( 'Compressing xlogs' ) if $self->verbose;
62     $self->start_writers( 'xlog' );
63
64     my $source_transform_from = basename( $self->{ 'source' }->{ 'path' } );
65     $source_transform_from =~ s{^/*}{};
66     $source_transform_from =~ s{/*$}{};
67
68     my $dot_backup_transform_from = $self->{ 'temp-dir' };
69     $dot_backup_transform_from =~ s{^/*}{};
70     $dot_backup_transform_from =~ s{/*$}{};
71
72     my $transform_to = basename( $self->{ 'data-dir' } ) . '/pg_xlog';
73     my $transform_command = sprintf 's#^\(%s\|%s\)#%s#', $source_transform_from, $dot_backup_transform_from, $transform_to;
74
75     $self->tar_and_compress(
76         'work_dir'  => dirname( $self->{ 'source' }->{ 'path' } ),
77         'tar_dir'   => [ basename( $self->{ 'source' }->{ 'path' } ), File::Spec->catfile( $self->{ 'temp-dir' }, $self->{ 'dot_backup_filename' } ), ],
78         'transform' => $transform_command,
79     );
80
81     $self->log->time_finish( 'Compressing xlogs' ) if $self->verbose;
82
83     return;
84 }
85
86 =head1 uncompress_wal_archive_segments()
87
88 In case walarchive (--source option) is compressed, L<omnipitr-backup-slave>
89 needs to uncompress files to temp directory before making archive - so that
90 the archive will be easier to use.
91
92 This work is being done in this function.
93
94 =cut
95
96 sub uncompress_wal_archive_segments {
97     my $self = shift;
98     return if 'none' eq $self->{ 'source' }->{ 'compression' };
99
100     my $old_source = $self->{ 'source' }->{ 'path' };
101     my $new_source = File::Spec->catfile( $self->{ 'temp-dir' }, 'uncompresses_pg_xlogs' );
102     $self->{ 'source' }->{ 'path' } = $new_source;
103
104     mkpath( [ $new_source ], 0, oct( "755" ) );
105
106     opendir my $dir, $old_source or $self->log->fatal( 'Cannot open wal-archive (%s) : %s', $old_source, $OS_ERROR );
107     my $extension = ext_for_compression( $self->{ 'source' }->{ 'compression' } );
108     my @wal_segments = sort grep { -f File::Spec->catfile( $old_source, $_ ) && /\Q$extension\E\z/ } readdir( $dir );
109     close $dir;
110
111     $self->log->log( '%s wal segments have to be uncompressed', scalar @wal_segments );
112
113     for my $segment ( @wal_segments ) {
114         my $old_file = File::Spec->catfile( $old_source, $segment );
115         my $new_file = File::Spec->catfile( $new_source, $segment );
116         copy( $old_file, $new_file ) or $self->log->fatal( 'Cannot copy %s to %s: %s', $old_file, $new_file, $OS_ERROR );
117         $self->log->log( 'File copied: %s -> %s', $old_file, $new_file );
118         my $response = run_command( $self->{ 'temp-dir' }, $self->{ 'nice-path' }, $self->{ $self->{ 'source' }->{ 'compression' } . '-path' }, '-d', $new_file );
119         if ( $response->{ 'error_code' } ) {
120             $self->log->fatal( 'Error while uncompressing wal segment %s: %s', $new_file, $response );
121         }
122     }
123     return;
124 }
125
126 =head make_dot_backup_file()
127
128 Make I<SEGMENT>.I<OFFSET>.backup file that will be included in xlog archive.
129
130 This file contains vital information like start and end position of WAL
131 reply that is required to get consistent state.
132
133 =cut
134
135 sub make_dot_backup_file {
136     my $self = shift;
137
138     my $redo_location  = $self->{ 'CONTROL' }->{ 'initial' }->{ "Latest checkpoint's REDO location" };
139     my $final_location = $self->{ 'CONTROL' }->{ 'final' }->{ "Latest checkpoint location" };
140     my $timeline       = $self->{ 'CONTROL' }->{ 'initial' }->{ "Latest checkpoint's TimeLineID" };
141     my $offset         = $redo_location;
142     $offset =~ s#.*/##;
143     $offset =~ s/^.*?(.{0,6})$/$1/;
144
145     my $output_filename = sprintf '%s.%08s.backup', $self->convert_wal_location_and_timeline_to_filename( $redo_location, $timeline ), $offset;
146
147     my @content_lines = @{ $self->{ 'backup_file_data' } };
148     splice( @content_lines, 1, 0, sprintf 'STOP WAL LOCATION: %s (file %s)', $final_location, $self->convert_wal_location_and_timeline_to_filename( $final_location, $timeline ) );
149     splice( @content_lines, 4, 0, sprintf 'START TIME: %s', strftime( '%Y-%m-%d %H:%M:%S %Z', localtime time ) );
150
151     my $content = join( "\n", @content_lines ) . "\n";
152
153     my $filename = File::Spec->catfile( $self->{ 'temp-dir' }, $output_filename );
154     if ( open my $fh, '>', $filename ) {
155         print $fh $content;
156         close $fh;
157         $self->{ 'dot_backup_filename' } = $output_filename;
158         return;
159     }
160     $self->log->fatal( 'Cannot write .backup file file %s : %s', $output_filename, $OS_ERROR );
161 }
162
163 =head1 wait_for_checkpoint_location_change()
164
165 Just like the name suggests - this function periodically (every 5 seconds,
166 hardcoded, as there is not much sense in parametrizing it) checks
167 pg_controldata of PGDATA, and finishes if value in B<Latest checkpoint
168 location> will change.
169
170 =cut
171
172 sub wait_for_checkpoint_location_change {
173     my $self     = shift;
174     my $pre_wait = $self->get_control_data()->{ 'Latest checkpoint location' };
175     $self->log->log( 'Waiting for checkpoint' ) if $self->verbose;
176     while ( 1 ) {
177         sleep 5;
178         $self->{ 'CONTROL' }->{ 'final' } = $self->get_control_data();
179         last if $self->{ 'CONTROL' }->{ 'final' }->{ 'Latest checkpoint location' } ne $pre_wait;
180     }
181     $self->log->log( 'Checkpoint .' ) if $self->verbose;
182     return;
183 }
184
185 =head1 make_backup_label_temp_file()
186
187 Normal hot backup contains file named 'backup_label' in PGDATA archive.
188
189 Since this is not normal hot backup - PostgreSQL will not create this file,
190 and it has to be created separately by I<omnipitr-backup-slave>.
191
192 This file is created in temp directory (it is B<not> created in PGDATA), and
193 is included in tar in such a way that, on uncompressing, it will get to
194 unarchived PGDATA.
195
196 =cut
197
198 sub make_backup_label_temp_file {
199     my $self = shift;
200
201     my $redo_location = $self->{ 'CONTROL' }->{ 'initial' }->{ "Latest checkpoint's REDO location" };
202     my $last_location = $self->{ 'CONTROL' }->{ 'initial' }->{ "Latest checkpoint location" };
203     my $timeline      = $self->{ 'CONTROL' }->{ 'initial' }->{ "Latest checkpoint's TimeLineID" };
204
205     my @content_lines = ();
206     push @content_lines, sprintf 'START WAL LOCATION: %s (file %s)', $redo_location, $self->convert_wal_location_and_timeline_to_filename( $redo_location, $timeline );
207     push @content_lines, sprintf 'CHECKPOINT LOCATION: %s', $last_location;
208     push @content_lines, sprintf 'START TIME: %s', strftime( '%Y-%m-%d %H:%M:%S %Z', localtime time );
209     push @content_lines, 'LABEL: OmniPITR_Slave_Hot_Backup';
210
211     $self->{ 'backup_file_data' } = \@content_lines;
212     my $content = join( "\n", @content_lines ) . "\n";
213
214     my $filename = File::Spec->catfile( $self->{ 'temp-dir' }, 'backup_label' );
215     if ( open my $fh, '>', $filename ) {
216         print $fh $content;
217         close $fh;
218         return;
219     }
220     $self->log->fatal( 'Cannot write backup_label file %s : %s', $filename, $OS_ERROR );
221 }
222
223 =head1 convert_wal_location_and_timeline_to_filename()
224
225 Helper function which converts WAL location and timeline number into
226 filename that given location will be in.
227
228 =cut
229
230 sub convert_wal_location_and_timeline_to_filename {
231     my $self = shift;
232     my ( $location, $timeline ) = @_;
233
234     my ( $series, $offset ) = split m{/}, $location;
235
236     $offset =~ s/.{0,6}$//;
237
238     my $location_filename = sprintf '%08s%08s%08s', $timeline, $series, $offset;
239
240     return $location_filename;
241 }
242
243 =head1 compress_pgdata()
244
245 Wrapper function which encapsulates all work required to compress data
246 directory.
247
248 =cut
249
250 sub compress_pgdata {
251     my $self = shift;
252
253     $self->make_backup_label_temp_file();
254
255     $self->log->time_start( 'Compressing $PGDATA' ) if $self->verbose;
256     $self->start_writers( 'data' );
257
258     my $transform_from = $self->{ 'temp-dir' };
259     $transform_from =~ s{^/*}{};
260     $transform_from =~ s{/*$}{};
261     my $transform_to = basename( $self->{ 'data-dir' } );
262     my $transform_command = sprintf 's#^%s/#%s/#', $transform_from, $transform_to;
263
264     my @excludes = qw( pg_log/* pg_xlog/0* pg_xlog/archive_status/* recovery.conf postmaster.pid );
265     for my $dir ( qw( pg_log pg_xlog ) ) {
266         push @excludes, $dir if -l File::Spec->catfile( $self->{ 'data-dir' }, $dir );
267     }
268
269     $self->tar_and_compress(
270         'work_dir'  => dirname( $self->{ 'data-dir' } ),
271         'tar_dir'   => [ basename( $self->{ 'data-dir' } ), File::Spec->catfile( $self->{ 'temp-dir' }, 'backup_label' ) ],
272         'excludes'  => [ map { sprintf( '%s/%s', basename( $self->{ 'data-dir' } ), $_ ) } @excludes ],
273         'transform' => $transform_command,
274     );
275
276     $self->log->time_finish( 'Compressing $PGDATA' ) if $self->verbose;
277     return;
278 }
279
280 =head1 pause_xlog_removal()
281
282 Creates trigger file that will pause removal of old segments by
283 I<omnipitr-restore>.
284
285 =cut
286
287 sub pause_xlog_removal {
288     my $self = shift;
289
290     if ( open my $fh, '>', $self->{ 'removal-pause-trigger' } ) {
291         print $fh $PROCESS_ID, "\n";
292         close $fh;
293         $self->{ 'removal-pause-trigger-created' } = 1;
294         return;
295     }
296     $self->log->fatal(
297         'Cannot create/write to removal pause trigger (%s) : %S',
298         $self->{ 'removal-pause-trigger' },
299         $OS_ERROR
300     );
301 }
302
303 =head1 unpause_xlog_removal()
304
305 Removed trigger file, effectively unpausing removal of old, obsolete log
306 segments in I<omnipitr-restore>.
307
308 =cut
309
310 sub unpause_xlog_removal {
311     my $self = shift;
312     unlink( $self->{ 'removal-pause-trigger' } );
313     delete $self->{ 'removal-pause-trigger-created' };
314     return;
315 }
316
317 =head1 DESTROY()
318
319 Destructor for object - removes created pause trigger;
320
321 =cut
322
323 sub DESTROY {
324     my $self = shift;
325     unlink( $self->{ 'removal-pause-trigger' } ) if $self->{ 'removal-pause-trigger-created' };
326     $self->SUPER::DESTROY();
327     return;
328 }
329
330 =head1 read_args()
331
332 Function which does all the parsing, and transformation of command line
333 arguments.
334
335 =cut
336
337 sub read_args {
338     my $self = shift;
339
340     my @argv_copy = @ARGV;
341
342     my %args = (
343         'temp-dir' => $ENV{ 'TMPDIR' } || '/tmp',
344         'gzip-path'          => 'gzip',
345         'bzip2-path'         => 'bzip2',
346         'lzma-path'          => 'lzma',
347         'tar-path'           => 'tar',
348         'nice-path'          => 'nice',
349         'rsync-path'         => 'rsync',
350         'pgcontroldata-path' => 'pg_controldata',
351         'filename-template'  => '__HOSTNAME__-__FILETYPE__-^Y-^m-^d.tar__CEXT__',
352     );
353
354     croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-backup-slave.pod' )
355         unless GetOptions(
356         \%args,
357         'data-dir|D=s',
358         'source|s=s',
359         'dst-local|dl=s@',
360         'dst-remote|dr=s@',
361         'temp-dir|t=s',
362         'log|l=s',
363         'filename-template|f=s',
364         'removal-pause-trigger|p=s',
365         'pid-file',
366         'verbose|v',
367         'gzip-path|gp=s',
368         'bzip2-path|bp=s',
369         'lzma-path|lp=s',
370         'nice-path|np=s',
371         'tar-path|tp=s',
372         'rsync-path|rp=s',
373         'pgcontroldata-path|pp=s',
374         );
375
376     croak( '--log was not provided - cannot continue.' ) unless $args{ 'log' };
377     for my $key ( qw( log filename-template ) ) {
378         $args{ $key } =~ tr/^/%/;
379     }
380
381     for my $key ( grep { !/^dst-(?:local|remote)$/ } keys %args ) {
382         $self->{ $key } = $args{ $key };
383     }
384
385     for my $type ( qw( local remote ) ) {
386         my $D = [];
387         $self->{ 'destination' }->{ $type } = $D;
388
389         next unless defined $args{ 'dst-' . $type };
390
391         my %temp_for_uniq = ();
392         my @items = grep { !$temp_for_uniq{ $_ }++ } @{ $args{ 'dst-' . $type } };
393
394         for my $item ( @items ) {
395             my $current = { 'compression' => 'none', };
396             if ( $item =~ s/\A(gzip|bzip2|lzma)=// ) {
397                 $current->{ 'compression' } = $1;
398             }
399             $current->{ 'path' } = $item;
400             push @{ $D }, $current;
401         }
402     }
403
404     if ( $args{ 'source' } =~ s/\A(gzip|bzip2|lzma)=// ) {
405         $self->{ 'source' } = {
406             'compression' => $1,
407             'path'        => $args{ 'source' },
408         };
409     }
410     else {
411         $self->{ 'source' } = {
412             'compression' => 'none',
413             'path'        => $args{ 'source' },
414         };
415     }
416
417     $self->{ 'filename-template' } = strftime( $self->{ 'filename-template' }, localtime time() );
418     $self->{ 'filename-template' } =~ s/__HOSTNAME__/hostname()/ge;
419
420     # We do it here so it will actually work for reporing problems in validation
421     $self->{ 'log_template' } = $args{ 'log' };
422     $self->{ 'log' }          = OmniPITR::Log->new( $self->{ 'log_template' } );
423
424     $self->log->log( 'Called with parameters: %s', join( ' ', @argv_copy ) ) if $self->verbose;
425
426     return;
427 }
428
429 =head1 validate_args()
430
431 Does all necessary validation of given command line arguments.
432
433 One exception is for compression programs paths - technically, it could be
434 validated in here, but benefit would be pretty limited, and code to do so
435 relatively complex, as compression program path might, but doesn't have to
436 be actual file path - it might be just program name (without path), which is
437 the default.
438
439 =cut
440
441 sub validate_args {
442     my $self = shift;
443
444     $self->log->fatal( 'Data-dir was not provided!' ) unless defined $self->{ 'data-dir' };
445     $self->log->fatal( 'Provided data-dir (%s) does not exist!',   $self->{ 'data-dir' } ) unless -e $self->{ 'data-dir' };
446     $self->log->fatal( 'Provided data-dir (%s) is not directory!', $self->{ 'data-dir' } ) unless -d $self->{ 'data-dir' };
447     $self->log->fatal( 'Provided data-dir (%s) is not readable!',  $self->{ 'data-dir' } ) unless -r $self->{ 'data-dir' };
448
449     my $dst_count = scalar( @{ $self->{ 'destination' }->{ 'local' } } ) + scalar( @{ $self->{ 'destination' }->{ 'remote' } } );
450     $self->log->fatal( "No --dst-* has been provided!" ) if 0 == $dst_count;
451
452     $self->log->fatal( "Filename template does not contain __FILETYPE__ placeholder!" ) unless $self->{ 'filename-template' } =~ /__FILETYPE__/;
453     $self->log->fatal( "Filename template cannot contain / or \\ characters!" ) if $self->{ 'filename-template' } =~ m{[/\\]};
454
455     $self->log->fatal( 'Source of WAL files was not provided!' ) unless defined $self->{ 'source' }->{ 'path' };
456     $self->log->fatal( 'Provided source of wal files (%s) does not exist!',   $self->{ 'source' }->{ 'path' } ) unless -e $self->{ 'source' }->{ 'path' };
457     $self->log->fatal( 'Provided source of wal files (%s) is not directory!', $self->{ 'source' }->{ 'path' } ) unless -d $self->{ 'source' }->{ 'path' };
458     $self->log->fatal( 'Provided source of wal files (%s) is not readable!',  $self->{ 'source' }->{ 'path' } ) unless -r $self->{ 'source' }->{ 'path' };
459
460     $self->log->fatal( 'Temp-dir was not provided!' ) unless defined $self->{ 'temp-dir' };
461     $self->log->fatal( 'Provided temp-dir (%s) does not exist!',   $self->{ 'temp-dir' } ) unless -e $self->{ 'temp-dir' };
462     $self->log->fatal( 'Provided temp-dir (%s) is not directory!', $self->{ 'temp-dir' } ) unless -d $self->{ 'temp-dir' };
463     $self->log->fatal( 'Provided temp-dir (%s) is not writable!',  $self->{ 'temp-dir' } ) unless -w $self->{ 'temp-dir' };
464     $self->log->fatal( 'Provided temp-dir (%s) contains # character!', $self->{ 'temp-dir' } ) if $self->{ 'temp-dir' } =~ /#/;
465
466     $self->log->fatal( 'Removal pause trigger name was not provided!' ) unless defined $self->{ 'removal-pause-trigger' };
467     $self->log->fatal( 'Provided removal pause trigger file (%s) already exists!', $self->{ 'removal-pause-trigger' } ) if -e $self->{ 'removal-pause-trigger' };
468
469     $self->log->fatal( 'Directory for provided removal pause trigger (%s) does not exist!',   $self->{ 'removal-pause-trigger' } ) unless -e dirname( $self->{ 'removal-pause-trigger' } );
470     $self->log->fatal( 'Directory for provided removal pause trigger (%s) is not directory!', $self->{ 'removal-pause-trigger' } ) unless -d dirname( $self->{ 'removal-pause-trigger' } );
471     $self->log->fatal( 'Directory for provided removal pause trigger (%s) is not writable!',  $self->{ 'removal-pause-trigger' } ) unless -w dirname( $self->{ 'removal-pause-trigger' } );
472
473     return unless $self->{ 'destination' }->{ 'local' };
474
475     for my $d ( @{ $self->{ 'destination' }->{ 'local' } } ) {
476         my $dir = $d->{ 'path' };
477         $self->log->fatal( 'Choosen local destination dir (%s) does not exist. Cannot continue.',   $dir ) unless -e $dir;
478         $self->log->fatal( 'Choosen local destination dir (%s) is not directory. Cannot continue.', $dir ) unless -d $dir;
479         $self->log->fatal( 'Choosen local destination dir (%s) is not writable. Cannot continue.',  $dir ) unless -w $dir;
480     }
481
482     return;
483 }
484
485 1;
Note: See TracBrowser for help on using the browser.