root/branches/omnipitr-with-slave-backups-calling-master/lib/OmniPITR/Program/Backup.pm

Revision 233, 14.8 kB (checked in by depesz, 3 years ago)

alpha version

Line 
1 package OmniPITR::Program::Backup;
2 use strict;
3 use warnings;
4
5 use base qw( OmniPITR::Program );
6
7 use File::Spec;
8 use File::Path qw( mkpath rmtree );
9 use English qw( -no_match_vars );
10 use OmniPITR::Tools qw( ext_for_compression run_command );
11 use Cwd;
12
13 =head1 run()
14
15 Main function wrapping all work.
16
17 Starts with getting list of compressions that have to be done, then it
18 chooses where to compress to (important if we have remote-only destination),
19 then it makes actual backup, and delivers to all destinations.
20
21 =cut
22
23 sub run {
24     my $self = shift;
25     $self->get_list_of_all_necessary_compressions();
26     $self->choose_base_local_destinations();
27
28     $self->log->time_start( 'Making data archive' ) if $self->verbose;
29     $self->make_data_archive();
30     $self->log->time_finish( 'Making data archive' ) if $self->verbose;
31
32     $self->log->time_start( 'Making xlog archive' ) if $self->verbose;
33     $self->make_xlog_archive();
34     $self->log->time_finish( 'Making xlog archive' ) if $self->verbose;
35
36     $self->deliver_to_all_destinations();
37
38     $self->log->log( 'All done.' );
39     return;
40 }
41
42 =head1 make_xlog_archive()
43
44 Just a stub method, that has to be overriden in subclasses.
45
46 =cut
47
48 sub make_xlog_archive {
49     my $self = shift;
50     croak( "make_xlog_archive() method in OmniPITR::Program::Backup was not overridden!" );
51 }
52
53 =head1 psql()
54
55 Runs given query via psql (assuming options stored in $self->{'psql'}).
56
57 In case of errors, it raises fatal error.
58
59 Otherwise returns stdout of the psql.
60
61 =cut
62
63 sub psql {
64     my $self = shift;
65
66     my $query = shift;
67
68     $self->prepare_temp_directory();
69
70     my @command = ( @{ $self->{ 'psql' } }, $query );
71
72     $self->log->time_start( $query ) if $self->verbose;
73     my $status = run_command( $self->{ 'temp-dir' }, @command );
74     $self->log->time_finish( $query ) if $self->verbose;
75
76     $self->log->fatal( 'Running [%s] via psql failed: %s', $query, $status ) if $status->{ 'error_code' };
77
78     return $status->{'stdout'};
79 }
80
81 =head1 wait_for_file()
82
83 Helper function which waits for file to appear.
84
85 It will return only if the file appeared.
86
87 Return value is name of file.
88
89 =cut
90
91 sub wait_for_file {
92     my $self = shift;
93     my ( $dir, $filename_regexp ) = @_;
94
95     my $max_wait = 3600;    # It's 1 hour. There is no technical need to wait longer.
96     for my $i ( 0 .. $max_wait ) {
97         $self->log->log( 'Waiting for file matching %s in directory %s', $filename_regexp, $dir ) if 10 == $i;
98
99         opendir( my $dh, $dir ) or $self->log->fatal( 'Cannot open %s for scanning: %s', $dir, $OS_ERROR );
100         my @matching = grep { $_ =~ $filename_regexp } readdir $dh;
101         closedir $dh;
102
103         if ( 0 == scalar @matching ) {
104             sleep 1;
105             next;
106         }
107
108         my $reply_filename = shift @matching;
109         $self->log->log( 'File %s arrived after %u seconds.', $reply_filename, $i ) if $self->verbose;
110         return $reply_filename;
111     }
112
113     $self->log->fatal( 'Waited 1 hour for file matching %s, but it did not appear. Something is wrong. No sense in waiting longer.', $filename_regexp );
114
115     return;
116 }
117
118 =head1 choose_base_local_destinations()
119
120 Chooses single local destination for every compression schema required by
121 destinations specifications.
122
123 In case some compression schema exists only for remote destination, local
124 temp directory is created in --temp-dir location.
125
126 =cut
127
128 sub choose_base_local_destinations {
129     my $self = shift;
130
131     my $base = { map { ( $_ => undef ) } @{ $self->{ 'compressions' } } };
132     $self->{ 'base' } = $base;
133
134     for my $dst ( @{ $self->{ 'destination' }->{ 'local' } } ) {
135         my $type = $dst->{ 'compression' };
136         next if defined $base->{ $type };
137         $base->{ $type } = $dst->{ 'path' };
138     }
139
140     my @unfilled = grep { !defined $base->{ $_ } } keys %{ $base };
141
142     return if 0 == scalar @unfilled;
143     $self->log->log( 'These compression(s) were given only for remote destinations. Usually this is not desired: %s', join( ', ', @unfilled ) );
144
145     $self->prepare_temp_directory();
146     for my $type ( @unfilled ) {
147         my $tmp_dir = File::Spec->catfile( $self->{ 'temp-dir' }, $type );
148         mkpath( $tmp_dir );
149         $base->{ $type } = $tmp_dir;
150     }
151
152     return;
153 }
154
155 =head1 start_writers()
156
157 Starts set of filehandles, which write to file, or to compression program,
158 to create final archives.
159
160 Each compression schema gets its own filehandle, and printing data to it,
161 will pass it to file directly or through compression program that has been
162 chosen based on command line arguments.
163
164 =cut
165
166 sub start_writers {
167     my $self      = shift;
168     my $data_type = shift;
169
170     my %writers = ();
171
172     COMPRESSION:
173     while ( my ( $type, $dst_path ) = each %{ $self->{ 'base' } } ) {
174         my $filename = $self->get_archive_filename( $data_type, $type );
175
176         my $full_file_path = File::Spec->catfile( $dst_path, $filename );
177
178         if ( $type eq 'none' ) {
179             if ( open my $fh, '>', $full_file_path ) {
180                 $writers{ $type } = $fh;
181                 $self->log->log( "Starting \"none\" writer to $full_file_path" ) if $self->verbose;
182                 next COMPRESSION;
183             }
184             $self->log->fatal( 'Cannot write to %s : %s', $full_file_path, $OS_ERROR );
185         }
186
187         my @command = map { quotemeta $_ } ( $self->{ $type . '-path' }, '--stdout', '-' );
188         unshift @command, quotemeta( $self->{ 'nice-path' } ) unless $self->{ 'not-nice' };
189         push @command, ( '>', quotemeta( $full_file_path ) );
190
191         $self->log->log( "Starting \"%s\" writer to %s", $type, $full_file_path ) if $self->verbose;
192         if ( open my $fh, '|-', join( ' ', @command ) ) {
193             $writers{ $type } = $fh;
194             next COMPRESSION;
195         }
196         $self->log->fatal( 'Cannot open command. Error: %s, Command: %s', $OS_ERROR, \@command );
197     }
198     $self->{ 'writers' } = \%writers;
199     return;
200 }
201
202 =head1 get_tablespaces_and_transforms()
203
204 Helper function.  Takes no arguments.  Uses pg_tblspc directory and returns
205 a listref of the physical locations for tar to include in the backup as well
206 as a listref of the transform regexs that it will need to apply in order for
207 those directories to get untarred to the correct location from pg_tblspc's
208 perspective.
209
210 =cut
211
212 sub get_tablespaces_and_transforms {
213     my $self = shift;
214
215     # Identify any tablespaces and get those
216     my $tablespace_dir = File::Spec->catfile( $self->{ 'data-dir' }, "pg_tblspc" );
217     my ( %tablespaces, @transform_regexs );
218     if ( -e $tablespace_dir ) {
219         my @pgfiles;
220         opendir( my $dh, $tablespace_dir ) or $self->log->fatal( "Unable to open tablespace directory $tablespace_dir" );
221
222         # Push onto our list the locations that are pointed to by the pg_tblspc symlinks
223         foreach my $filename ( readdir $dh ) {
224             next if $filename !~ /^\d+$/;    # Filename should be all numeric
225             my $full_name = File::Spec->catfile( $tablespace_dir, $filename );
226             next if !-l $full_name;          # It should be a symbolic link
227             my $link = readlink $full_name;
228             push @pgfiles, $link if $link;    # If it's a valid link, put it onto the list
229         }
230         closedir $dh;
231
232         # At this point pgfiles contains a list of the destinations.  Some of THOSE might be links however and need
233         # to be identified since we need to pass the actual location bottom location to tar
234         %tablespaces = map { $_ => Cwd::abs_path( $_ ) } @pgfiles;
235
236         # Populate the regexes to put these directories under tablespaces with transforms so that the actual physical location
237         # is transformed into the 1-level deep link that the pg_tblspc files are pointing at.  We substr becase tar strips leading /
238         push @transform_regexs, map { "s,^" . substr( $tablespaces{ $_ }, 1 ) . ",tablespaces$_," } keys %tablespaces;
239     }
240     $self->log->log( "Including tablespaces: " . ( join ", ", ( keys %tablespaces ) ) . "\n" ) if $self->verbose && keys %tablespaces;
241
242     return ( [ values %tablespaces ], \@transform_regexs );
243 }
244
245 =head1 get_archive_filename()
246
247 Helper function, which takes filetype and compression schema to use, and
248 returns generated filename (based on filename-template command line option).
249
250 =cut
251
252 sub get_archive_filename {
253     my $self = shift;
254     my ( $type, $compression ) = @_;
255
256     $compression = 'none' unless defined $compression;
257
258     my $ext = $compression eq 'none' ? '' : ext_for_compression( $compression );
259
260     my $filename = $self->{ 'filename-template' };
261     $filename =~ s/__FILETYPE__/$type/g;
262     $filename =~ s/__CEXT__/$ext/g;
263
264     return $filename;
265 }
266
267 =head1 tar_and_compress()
268
269 Worker function which does all of the actual tar, and sending data to
270 compression filehandles (should be opened before).
271
272 Takes hash (not hashref) as argument, and uses following keys from it:
273
274 =over
275
276 =item * tar_dir - arrayref with list of directories to compress
277
278 =item * work_dir - what should be current working directory when executing
279 tar
280
281 =item * excludes - optional key, that (if exists) is treated as arrayref of
282 shell globs (tar dir) of items to exclude from backup
283
284 =item * transform - optional key, that (if exists) is treated as value for
285 --transform option for tar
286
287 =back
288
289 If tar will print anything to STDERR it will be logged. Error status code is
290 ignored, as it is expected that tar will generate errors (due to files
291 modified while archiving).
292
293 Requires following keys in $self:
294
295 =over
296
297 =item * nice-path
298
299 =item * tar-path
300
301 =back
302
303 =cut
304
305 sub tar_and_compress {
306     my $self = shift;
307     my %ARGS = @_;
308
309     local $SIG{ 'PIPE' } = sub { $self->log->fatal( 'Got SIGPIPE while tarring %s for %s', $ARGS{ 'tar_dir' }, $self->{ 'sigpipeinfo' } ); };
310
311     my @compression_command = ( $self->{ 'tar-path' }, 'cf', '-' );
312     unshift @compression_command, $self->{ 'nice-path' } unless $self->{ 'not-nice' };
313
314     if ( $ARGS{ 'excludes' } ) {
315         push @compression_command, map { '--exclude=' . $_ } @{ $ARGS{ 'excludes' } };
316     }
317
318     if ( $ARGS{ 'transform' } ) {
319         if ( ref $ARGS{ 'transform' } ) {
320             push @compression_command, map { '--transform=' . $_ } @{ $ARGS{ 'transform' } };
321         }
322         else {
323             push @compression_command, '--transform=' . $ARGS{ 'transform' };
324         }
325     }
326
327     push @compression_command, @{ $ARGS{ 'tar_dir' } };
328
329     my $compression_str = join ' ', map { quotemeta $_ } @compression_command;
330
331     $self->prepare_temp_directory();
332
333     my $tar_stderr_filename = File::Spec->catfile( $self->{ 'temp-dir' }, 'tar.stderr' );
334     $compression_str .= ' 2> ' . quotemeta( $tar_stderr_filename );
335
336     my $previous_dir = getcwd;
337     chdir $ARGS{ 'work_dir' } if $ARGS{ 'work_dir' };
338
339     my $tar;
340     unless ( open $tar, '-|', $compression_str ) {
341         $self->log->fatal( 'Cannot start tar (%s) : %s', $compression_str, $OS_ERROR );
342     }
343
344     chdir $previous_dir if $ARGS{ 'work_dir' };
345
346     my $buffer;
347     while ( my $len = sysread( $tar, $buffer, 8192 ) ) {
348         while ( my ( $type, $fh ) = each %{ $self->{ 'writers' } } ) {
349             $self->{ 'sigpipeinfo' } = $type;
350             my $written = syswrite( $fh, $buffer, $len );
351             next if $written == $len;
352             $self->log->fatal( "Writting %u bytes to filehandle for <%s> compression wrote only %u bytes ?!", $len, $type, $written );
353         }
354     }
355     close $tar;
356
357     for my $fh ( values %{ $self->{ 'writers' } } ) {
358         close $fh;
359     }
360
361     delete $self->{ 'writers' };
362
363     my $stderr_output;
364     my $stderr;
365     unless ( open $stderr, '<', $tar_stderr_filename ) {
366         $self->log->log( 'Cannot open tar stderr file (%s) for reading: %s', $tar_stderr_filename );
367         return;
368     }
369     {
370         local $/;
371         $stderr_output = <$stderr>;
372     };
373     close $stderr;
374     return unless $stderr_output;
375     $self->log->log( 'Tar (%s) generated these output on stderr:', $compression_str );
376     $self->log->log( '==============================================' );
377     $self->log->log( '%s', $stderr_output );
378     $self->log->log( '==============================================' );
379     unlink $tar_stderr_filename;
380     return;
381 }
382
383 =head1 deliver_to_all_destinations()
384
385 Simple wrapper to have single point to call to deliver backups to all
386 requested backups.
387
388 =cut
389
390 sub deliver_to_all_destinations {
391     my $self = shift;
392
393     $self->deliver_to_all_local_destinations();
394
395     $self->deliver_to_all_remote_destinations();
396
397     return;
398 }
399
400 =head1 deliver_to_all_local_destinations()
401
402 Copies backups to all local destinations which are not also base
403 destinations for their respective compressions.
404
405 =cut
406
407 sub deliver_to_all_local_destinations {
408     my $self = shift;
409     return unless $self->{ 'destination' }->{ 'local' };
410     for my $dst ( @{ $self->{ 'destination' }->{ 'local' } } ) {
411         next if $dst->{ 'path' } eq $self->{ 'base' }->{ $dst->{ 'compression' } };
412
413         my $B = $self->{ 'base' }->{ $dst->{ 'compression' } };
414
415         for my $type ( qw( data xlog ) ) {
416
417             my $filename = $self->get_archive_filename( $type, $dst->{ 'compression' } );
418             my $source_filename = File::Spec->catfile( $B, $filename );
419             my $destination_filename = File::Spec->catfile( $dst->{ 'path' }, $filename );
420
421             my $time_msg = sprintf 'Copying %s to %s', $source_filename, $destination_filename;
422             $self->log->time_start( $time_msg ) if $self->verbose;
423
424             my $rc = copy( $source_filename, $destination_filename );
425
426             $self->log->time_finish( $time_msg ) if $self->verbose;
427
428             unless ( $rc ) {
429                 $self->log->error( 'Cannot copy %s to %s : %s', $source_filename, $destination_filename, $OS_ERROR );
430                 $self->{ 'had_errors' } = 1;
431             }
432
433         }
434     }
435     return;
436 }
437
438 =head1 deliver_to_all_remote_destinations()
439
440 Delivers backups to remote destinations using rsync program.
441
442 =cut
443
444 sub deliver_to_all_remote_destinations {
445     my $self = shift;
446     return unless $self->{ 'destination' }->{ 'remote' };
447     for my $dst ( @{ $self->{ 'destination' }->{ 'remote' } } ) {
448
449         my $B = $self->{ 'base' }->{ $dst->{ 'compression' } };
450
451         for my $type ( qw( data xlog ) ) {
452
453             my $filename = $self->get_archive_filename( $type, $dst->{ 'compression' } );
454             my $source_filename = File::Spec->catfile( $B, $filename );
455             my $destination_filename = $dst->{ 'path' };
456             $destination_filename =~ s{/*\z}{/};
457             $destination_filename .= $filename;
458
459             my $time_msg = sprintf 'Copying %s to %s', $source_filename, $destination_filename;
460             $self->log->time_start( $time_msg ) if $self->verbose;
461
462             my $response = run_command( $self->{ 'temp-dir' }, $self->{ 'rsync-path' }, $source_filename, $destination_filename );
463
464             $self->log->time_finish( $time_msg ) if $self->verbose;
465
466             if ( $response->{ 'error_code' } ) {
467                 $self->log->error( 'Cannot send archive %s to %s: %s', $source_filename, $destination_filename, $response );
468                 $self->{ 'had_errors' } = 1;
469             }
470         }
471     }
472     return;
473 }
474
475 1;
Note: See TracBrowser for help on using the browser.