root/trunk/omnipitr/lib/OmniPITR/Program/Restore.pm

Revision 115, 17.1 kB (checked in by depesz, 4 years ago)

add and reformat existing docs

Line 
1 package OmniPITR::Program::Restore;
2 use strict;
3 use warnings;
4
5 use base qw( OmniPITR::Program );
6
7 use Carp;
8 use OmniPITR::Tools qw( :all );
9 use English qw( -no_match_vars );
10 use File::Basename;
11 use File::Spec;
12 use File::Path qw( make_path remove_tree );
13 use File::Copy;
14 use Storable;
15 use Data::Dumper;
16 use Getopt::Long;
17 use Cwd;
18
19 =head1 run()
20
21 Main function, called by actual script in bin/, wraps all work done by
22 script with the sole exception of reading and validating command line
23 arguments.
24
25 These tasks (reading and validating arguments) are in this module, but
26 they are called from L<OmniPITR::Program::new()>
27
28 Name of called method should be self explanatory, and if you need
29 further information - simply check doc for the method you have questions
30 about.
31
32 =cut
33
34 sub run {
35     my $self = shift;
36
37     $SIG{ 'USR1' } = sub {
38         $self->{ 'finish' } = 'immediate';
39         return;
40     };
41
42     while ( 1 ) {
43         $self->try_to_restore_and_exit();
44         sleep 1;
45         next if $self->{ 'finish' };
46         $self->check_for_trigger_file();
47         next if $self->{ 'finish' };
48         $self->do_some_removal();
49     }
50 }
51
52 =head1 do_some_removal()
53
54 Wraps all work necessary to remove obsolete WAL segments from archive.
55
56 Contains actual I<unlink> calls, but all other work (checking
57 pg_controldata, extracting last REDO segment, getting list of files to
58 remove, calling pre-removal hook) is delegated to dedicated methods.
59
60 =cut
61
62 sub do_some_removal {
63     my $self = shift;
64
65     return unless $self->{ 'remove-unneeded' };
66
67     if ( $self->{ 'removal-pause-trigger' } && -e $self->{ 'removal-pause-trigger' } ) {
68         unless ( $self->{ 'trigger-logged' } ) {
69             $self->{ 'trigger-logged' }++;
70             $self->log->log( 'Pause trigger exists (%s), NOT removing any old segments.', $self->{ 'removal-pause-trigger' } );
71         }
72         return;
73     }
74
75     my $control_data = $self->get_control_data();
76     return unless $control_data;
77
78     my $last_important = $self->get_last_redo_segment( $control_data );
79     return unless $last_important;
80
81     my @to_be_removed = $self->get_list_of_segments_to_remove( $last_important );
82     return if 0 == scalar @to_be_removed;
83
84     for my $segment_name ( @to_be_removed ) {
85         return unless $self->handle_pre_removal_processing( $segment_name );
86
87         my $segment_file_name = File::Spec->catfile( $self->{ 'source' }->{ 'path' }, $segment_name );
88         $segment_file_name .= ext_for_compression( $self->{ 'source' }->{ 'compression' } ) if $self->{ 'source' }->{ 'compression' };
89         my $result = unlink $segment_file_name;
90         unless ( 1 == $result ) {
91             $self->log->error( 'Error while unlinking %s : %s', $segment_file_name, $OS_ERROR );
92             return;
93         }
94         $self->log->log( 'Segment %s (%s) removed, as it is too old (older than %s)', $segment_name, $segment_file_name, $last_important );
95     }
96     return;
97 }
98
99 =head1 handle_pre_removal_processing()
100
101 Before removing obsolete WAL segment, I<omnipitr-restore> can call
102 arbitrary program to do whatever is necessary - for example - to send
103 the WAL segment to backup server.
104
105 This is done in here. Each segment is first uncompressed to temporary
106 directory, and then given program is called.
107
108 Temporary directory is always made so that it "looks" like it was called
109 by archive-command from PostgreSQL, i.e.:
110
111 =over
112
113 =item * Current directory contains pg_xlog directory
114
115 =item * Segment is unpacked
116
117 =item * Segment is in pg_xlog directory
118
119 =item * Handler program is called with segment name like
120 'pg_xlog/000000010000000500000073'
121
122 =back
123
124 =cut
125
126 sub handle_pre_removal_processing {
127     my $self         = shift;
128     my $segment_name = shift;
129     return 1 unless $self->{ 'pre-removal-processing' };
130
131     $self->prepare_temp_directory();
132     my $xlog_dir  = File::Spec->catfile( $self->{ 'temp-dir' }, 'pg_xlog' );
133     my $xlog_file = File::Spec->catfile( $xlog_dir,             $segment_name );
134     make_path( $xlog_dir );
135
136     my $comment = 'Copying segment ' . $segment_name . ' to ' . $xlog_file;
137     $self->log->time_start( $comment ) if $self->{ 'verbose' };
138     my $response = $self->copy_segment_to( $segment_name, $xlog_file );
139     $self->log->time_finish( $comment ) if $self->{ 'verbose' };
140
141     if ( $response ) {
142         $self->log->error( 'Error while copying segment for pre removal processing for %s : %s', $segment_name, $response );
143         return;
144     }
145
146     my $previous_dir = getcwd();
147     chdir $self->{ 'temp-dir' };
148
149     my $full_command = $self->{ 'pre-removal-processing' } . " pg_xlog/$segment_name";
150
151     $comment = 'Running pre-removal-processing command: ' . $full_command;
152
153     $self->log->time_start( $comment ) if $self->{ 'verbose' };
154     my $result = run_command( $self->{ 'tempdir' }, 'bash', '-c', $full_command );
155     $self->log->time_finish( $comment ) if $self->{ 'verbose' };
156
157     chdir $previous_dir;
158
159     remove_tree( $xlog_dir );
160     return 1 unless $result->{ 'error_code' };
161
162     $self->log->error( 'Error while calling pre removal processing [%s] : %s', $full_command, $result );
163
164     return;
165 }
166
167 =head1 get_list_of_segments_to_remove()
168
169 Scans source directory, and returns names of all files, which are
170 "older" than last required segment (REDO segment from pg_controldata).
171
172 Older - is defined as alphabetically smaller than REDO segment.
173
174 Returns at most X files, where X is defined by --remove-at-a-time
175 command line option.
176
177 =cut
178
179 sub get_list_of_segments_to_remove {
180     my $self           = shift;
181     my $last_important = shift;
182
183     my $extension = ext_for_compression( $self->{ 'source' }->{ 'compression' } ) if $self->{ 'source' }->{ 'compression' };
184     my $dir;
185
186     unless ( opendir( $dir, $self->{ 'source' }->{ 'path' } ) ) {
187         $self->log->error( 'Cannot open source directory (%s) for reading: %s', $self->{ 'source' }->{ 'path' }, $OS_ERROR );
188         return;
189     }
190     my @content = readdir $dir;
191     closedir $dir;
192
193     my @too_old = ();
194     for my $file ( @content ) {
195         $file =~ s/\Q$extension\E\z//;
196         next unless $file =~ m{\A[a-fA-F0-9]{24}(?:\.[a-fA-F0-9]{8}\.backup)?\z};
197         next unless $file lt $last_important;
198         push @too_old, $file;
199     }
200     return if 0 == scalar @too_old;
201
202     $self->log->log( '%u segments too old, to be removed.', scalar @too_old ) if $self->{ 'verbose' };
203
204     my @sorted = sort @too_old;
205     splice( @sorted, $self->{ 'remove-at-a-time' } );
206
207     return @sorted;
208 }
209
210 =head1 get_last_redo_segment()
211
212 Based on information from pg_controldata, returns name of file that
213 contains oldest file required in case recovery would have to be
214 restarted.
215
216 This is required to be able to tell which files can be safely removed
217 from archive.
218
219 =cut
220
221 sub get_last_redo_segment {
222     my $self = shift;
223     my $CD   = shift;
224
225     my $segment  = $CD->{ "Latest checkpoint's REDO location" };
226     my $timeline = $CD->{ "Latest checkpoint's TimeLineID" };
227
228     my ( $series, $offset ) = split m{/}, $segment;
229
230     $offset =~ s/.{6}$//;
231
232     my $segment_filename = sprintf '%08s%08s%08s', $timeline, $series, $offset;
233
234     return $segment_filename;
235 }
236
237 =head1 get_control_data()
238
239 Calls pg_controldata, and parses its output.
240
241 Verifies that output contains 2 critical pieces of information:
242
243 =over
244
245 =item * Latest checkpoint's REDO location
246
247 =item * Latest checkpoint's TimeLineID
248
249 =back
250
251 =cut
252
253 sub get_control_data {
254     my $self = shift;
255
256     $self->prepare_temp_directory();
257
258     my $response = run_command( $self->{ 'temp-dir' }, $self->{ 'pgcontroldata-path' }, $self->{ 'data-dir' } );
259     if ( $response->{ 'error_code' } ) {
260         $self->log->error( 'Error while getting pg_controldata for %s: %s', $self->{ 'data-dir' }, $response );
261         return;
262     }
263
264     my $control_data = {};
265
266     my @lines = split( /\s*\n/, $response->{ 'stdout' } );
267     for my $line ( @lines ) {
268         unless ( $line =~ m{\A([^:]+):\s*(.*)\z} ) {
269             $self->log->error( 'Pg_controldata for %s contained unparseable line: [%s]', $self->{ 'data-dir' }, $line );
270             $self->exit_with_status( 1 );
271         }
272         $control_data->{ $1 } = $2;
273     }
274
275     unless ( $control_data->{ "Latest checkpoint's REDO location" } ) {
276         $self->log->error( 'Pg_controldata for %s did not contain latest checkpoint redo location', $self->{ 'data-dir' } );
277         return;
278     }
279     unless ( $control_data->{ "Latest checkpoint's TimeLineID" } ) {
280         $self->log->error( 'Pg_controldata for %s did not contain latest checkpoint timeline ID', $self->{ 'data-dir' } );
281         return;
282     }
283
284     return $control_data;
285 }
286
287 =head1 try_to_restore_and_exit()
288
289 Checks if requested wal segment exists, and is ready to be restored (
290 vide --recovery-delay option).
291
292 Handles also situations where there is finish request (both immediate
293 and smart).
294
295 If recovery worked - finished with status 0.
296
297 If no file can be returned yet - goes back to main loop in L<run()>
298 method.
299
300 =cut
301
302 sub try_to_restore_and_exit {
303     my $self = shift;
304
305     if ( $self->{ 'finish' } eq 'immediate' ) {
306         $self->log->error( 'Got immediate finish request. Dying.' );
307         $self->exit_with_status( 1 );
308     }
309
310     my $wanted_file = File::Spec->catfile( $self->{ 'source' }->{ 'path' }, $self->{ 'segment' } );
311     $wanted_file .= ext_for_compression( $self->{ 'source' }->{ 'compression' } ) if $self->{ 'source' }->{ 'compression' };
312
313     unless ( -e $wanted_file ) {
314         if ( $self->{ 'finish' } ) {
315             $self->log->error( 'Got finish request. Dying.' );
316             $self->exit_with_status( 1 );
317         }
318         return;
319     }
320
321     if (   ( $self->{ 'recovery-delay' } )
322         && ( !$self->{ 'finish' } ) )
323     {
324         my @file_info  = stat( $wanted_file );
325         my $file_mtime = $file_info[ 9 ];
326         my $ok_since   = time() - $self->{ 'recovery-delay' };
327         if ( $ok_since <= $file_mtime ) {
328             if (   ( $self->{ 'verbose' } )
329                 && ( !$self->{ 'logged_delay' } ) )
330             {
331                 $self->log->log( 'Segment %s found, but it is too fresh (mtime = %u, accepted since %u)', $self->{ 'segment' }, $file_mtime, $ok_since );
332                 $self->{ 'logged_delay' } = 1;
333             }
334             return;
335         }
336     }
337
338     my $full_destination = File::Spec->catfile( $self->{ 'data-dir' }, $self->{ 'segment_destination' } );
339
340     my $comment = 'Copying segment ' . $self->{ 'segment' } . ' to ' . $full_destination;
341     $self->log->time_start( $comment ) if $self->{ 'verbose' };
342     my $response = $self->copy_segment_to( $self->{ 'segment' }, $full_destination );
343     $self->log->time_finish( $comment ) if $self->{ 'verbose' };
344
345     if ( $response ) {
346         $self->log->error( $response );
347         $self->exit_with_status( 1 );
348     }
349
350     $self->log->log( 'Segment %s restored', $self->{ 'segment' } );
351     $self->exit_with_status( 0 );
352 }
353
354 =head1 copy_segment_to()
355
356 Helper function which deals with copying segment from archive to given
357 destination, handling compression when necessary.
358
359 =cut
360
361 sub copy_segment_to {
362     my $self = shift;
363     my ( $segment_name, $destination ) = @_;
364
365     my $wanted_file = File::Spec->catfile( $self->{ 'source' }->{ 'path' }, $segment_name );
366     $wanted_file .= ext_for_compression( $self->{ 'source' }->{ 'compression' } ) if $self->{ 'source' }->{ 'compression' };
367
368     unless ( $self->{ 'source' }->{ 'compression' } ) {
369         if ( copy( $wanted_file, $destination ) ) {
370             return;
371         }
372         return sprintf( 'Copying %s to %s failed: %s', $wanted_file, $destination, $OS_ERROR );
373     }
374
375     my $compression = $self->{ 'source' }->{ 'compression' };
376     my $command = sprintf '%s --decompress --stdout %s > %s', quotemeta( $self->{ "$compression-path" } ), quotemeta( $wanted_file ), quotemeta( $destination );
377
378     $self->prepare_temp_directory();
379
380     my $response = run_command( $self->{ 'temp-dir' }, 'bash', '-c', $command );
381
382     return sprintf( 'Uncompressing %s to %s failed: %s', $wanted_file, $destination, Dumper( $response ) ) if $response->{ 'error_code' };
383     return;
384 }
385
386 =head1 check_for_trigger_file()
387
388 Checks existence and possibly content of finish-trigger file, setting
389 appropriate flags.
390
391 =cut
392
393 sub check_for_trigger_file {
394     my $self = shift;
395
396     return unless $self->{ 'finish-trigger' };
397     return unless -e $self->{ 'finish-trigger' };
398
399     if ( open my $fh, '<', $self->{ 'finish-trigger' } ) {
400         local $INPUT_RECORD_SEPARATOR = undef;
401         my $content = <$fh>;
402         close $fh;
403
404         $self->{ 'finish' } = $content =~ m{\ANOW\n?\z} ? 'immediate' : 'smart';
405
406         $self->log->log( 'Finish trigger found, %s mode.', $self->{ 'finish' } );
407         return;
408     }
409     $self->log->fatal( 'Finish trigger (%s) exists, but cannot be open?! : %s', $self->{ 'finish-trigger' }, $OS_ERROR );
410 }
411
412 =head1 exit_with_status()
413
414 Exit function, doing cleanup (remove temp-dir), and exiting with given status.
415
416 =cut
417
418 sub exit_with_status {
419     my $self = shift;
420     my $code = shift;
421
422     remove_tree( $self->{ 'temp-dir' } ) if $self->{ 'temp-dir-prepared' };
423
424     exit( $code );
425 }
426
427 =head1 prepare_temp_directory()
428
429 Helper function, which builds path for temp directory, and creates it.
430
431 Path is generated by using given temp-dir and 'omnipitr-restore' name.
432
433 For example, for temp-dir '/tmp', actual, used temp directory would be /tmp/omnipitr-restore/.
434
435 =cut
436
437 sub prepare_temp_directory {
438     my $self = shift;
439     return if $self->{ 'temp-dir-prepared' };
440     my $full_temp_dir = File::Spec->catfile( $self->{ 'temp-dir' }, basename( $PROGRAM_NAME ) );
441     make_path( $full_temp_dir );
442     $self->{ 'temp-dir' }          = $full_temp_dir;
443     $self->{ 'temp-dir-prepared' } = 1;
444     return;
445 }
446
447 =head1 read_args()
448
449 Function which does all the parsing, and transformation of command line
450 arguments.
451
452 It also verified base facts about passed WAL segment name, but all other
453 validations, are being done in separate function: L<validate_args()>.
454
455 =cut
456
457 =head1 read_args()
458
459 =cut
460
461 sub read_args {
462     my $self = shift;
463
464     my @argv_copy = @ARGV;
465
466     my %args = (
467         'bzip2-path'         => 'bzip2',
468         'data-dir'           => '.',
469         'gzip-path'          => 'gzip',
470         'lzma-path'          => 'lzma',
471         'pgcontroldata-path' => 'pg_controldata',
472         'remove-at-a-time'   => 3,
473         'temp-dir'           => $ENV{ 'TMPDIR' } || '/tmp',
474     );
475
476     croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-restore.pod' )
477         unless GetOptions(
478         \%args,
479         'bzip2-path|bp=s',
480         'data-dir|D=s',
481         'finish-trigger|f=s',
482         'gzip-path|gp=s',
483         'log|l=s',
484         'lzma-path|lp=s',
485         'pgcontroldata-path|pp=s',
486         'pid-file=s',
487         'pre-removal-processing|h=s',
488         'remove-at-a-time|rt=i',
489         'recovery-delay|w=i',
490         'removal-pause-trigger|p=s',
491         'remove-unneeded|r',
492         'source|s=s',
493         'temp-dir|t=s',
494         'verbose|v',
495         );
496
497     croak( '--log was not provided - cannot continue.' ) unless $args{ 'log' };
498     $args{ 'log' } =~ tr/^/%/;
499
500     for my $key ( keys %args ) {
501         next if $key =~ m{ \A (?: source | log ) \z }x;    # Skip those, not needed in $self
502         $self->{ $key } = $args{ $key };
503     }
504
505     # We do it here so it will actually work for reporing problems in validation
506     $self->{ 'log_template' } = $args{ 'log' };
507     $self->{ 'log' }          = OmniPITR::Log->new( $self->{ 'log_template' } );
508
509     $self->log->fatal( 'Source path not provided!' ) unless $args{ 'source' };
510
511     if ( $args{ 'source' } =~ s/\A(gzip|bzip2|lzma)=// ) {
512         $self->{ 'source' }->{ 'compression' } = $1;
513     }
514     $self->{ 'source' }->{ 'path' } = $args{ 'source' };
515
516     # These could theoretically go into validation, but we need to check if we can get anything to put in segment* keys in $self
517     $self->log->fatal( 'WAL segment file name and/or destination have not been given' ) if 2 > scalar @ARGV;
518     $self->log->fatal( 'Too many arguments given.' ) if 2 < scalar @ARGV;
519
520     @{ $self }{ qw( segment segment_destination ) } = @ARGV;
521
522     $self->{ 'finish' } = '';
523
524     $self->log->log( 'Called with parameters: %s', join( ' ', @argv_copy ) ) if $self->{ 'verbose' };
525
526     return;
527 }
528
529 =head1 validate_args()
530
531 Does all necessary validation of given command line arguments.
532
533 One exception is for compression programs paths - technically, it could
534 be validated in here, but benefit would be pretty limited, and code to
535 do so relatively complex, as compression program path might, but doesn't
536 have to be actual file path - it might be just program name (without
537 path), which is the default.
538
539 =cut
540
541 sub validate_args {
542     my $self = shift;
543
544     $self->log->fatal( 'Given data-dir (%s) is not valid', $self->{ 'data-dir' } ) unless -d $self->{ 'data-dir' } && -f File::Spec->catfile( $self->{ 'data-dir' }, 'PG_VERSION' );
545
546     $self->log->fatal( 'Given segment name is not valid (%s)', $self->{ 'segment' } ) unless $self->{ 'segment' } =~ m{\A[a-fA-F0-9]{24}(?:\.[a-fA-F0-9]{8}\.backup)?\z};
547
548     $self->log->fatal( 'Given source (%s) is not a directory', $self->{ 'source' }->{ 'path' } ) unless -d $self->{ 'source' }->{ 'path' };
549     $self->log->fatal( 'Given source (%s) is not readable',    $self->{ 'source' }->{ 'path' } ) unless -r $self->{ 'source' }->{ 'path' };
550     $self->log->fatal( 'Given source (%s) is not writable',    $self->{ 'source' }->{ 'path' } ) unless -w $self->{ 'source' }->{ 'path' };
551
552     return;
553 }
554
555 1;
Note: See TracBrowser for help on using the browser.