root/trunk/omnipitr/lib/OmniPITR/Program/Archive.pm

Revision 174, 13.4 kB (checked in by depesz, 4 years ago)

First stage of refactoring

Line 
1 package OmniPITR::Program::Archive;
2 use strict;
3 use warnings;
4
5 use base qw( OmniPITR::Program );
6
7 use Carp;
8 use OmniPITR::Tools qw( :all );
9 use English qw( -no_match_vars );
10 use File::Basename;
11 use File::Spec;
12 use File::Path qw( mkpath rmtree );
13 use File::Copy;
14 use Storable;
15 use Getopt::Long;
16
17 =head1 run()
18
19 Main function, called by actual script in bin/, wraps all work done by script with the sole exception of reading and validating command line arguments.
20
21 These tasks (reading and validating arguments) are in this module, but they are called from L<OmniPITR::Program::new()>
22
23 Name of called method should be self explanatory, and if you need further information - simply check doc for the method you have questions about.
24
25 =cut
26
27 sub run {
28     my $self = shift;
29     $self->read_state();
30     $self->prepare_temp_directory( basename( $self->{ 'segment' } ) );
31     $self->make_all_necessary_compressions();
32     $self->send_to_destinations();
33     $self->cleanup();
34     $self->log->log( 'Segment %s successfully sent to all destinations.', $self->{ 'segment' } );
35     return;
36 }
37
38 =head1 send_to_destinations()
39
40 Does all the actual sending of segments to local and remote destinations.
41
42 It keeps it's state to be able to continue in case of error.
43
44 Since both local and remote destinations are handled in the same way, there is no point in duplicating the code to 2 methods.
45
46 Important notice - this function has to have the ability to choose whether to use temp file (for compressed destinations), or original segment (for uncompressed ones). This is done by this line:
47
48     my $local_file = $dst->{ 'compression' } eq 'none' ? $self->{ 'segment' } : $self->get_temp_filename_for( $dst->{ 'compression' } );
49
50 =cut
51
52 sub send_to_destinations {
53     my $self = shift;
54
55     for my $destination_type ( qw( local remote ) ) {
56         next unless my $dst_list = $self->{ 'destination' }->{ $destination_type };
57         for my $dst ( @{ $dst_list } ) {
58             next if $self->segment_already_sent( $destination_type, $dst );
59
60             my $local_file = $dst->{ 'compression' } eq 'none' ? $self->{ 'segment' } : $self->get_temp_filename_for( $dst->{ 'compression' } );
61
62             my $destination_file_path = $dst->{ 'path' };
63
64             my $is_backup = undef;
65             if ( $self->{ 'dst-backup' } ) {
66                 $is_backup = 1 if $dst->{ 'path' } eq $self->{ 'dst-backup' };
67             }
68
69             $destination_file_path =~ s{/*\z}{};
70             $destination_file_path .= '/' . basename( $local_file );
71
72             my $comment = 'Sending ' . $local_file . ' to ' . $destination_file_path;
73
74             $self->log->time_start( $comment ) if $self->verbose;
75             my $response = run_command( $self->{ 'temp-dir' }, $self->{ 'rsync-path' }, $local_file, $destination_file_path );
76             $self->log->time_finish( $comment ) if $self->verbose;
77
78             if ( $response->{ 'error_code' } ) {
79                 if ( $is_backup ) {
80                     $self->log->error( "Sending segment %s to backup destination %s generated (ignored) error: %s", $local_file, $destination_file_path, $response );
81                 }
82                 else {
83                     $self->log->fatal( "Cannot send segment %s to %s : %s", $local_file, $destination_file_path, $response );
84                 }
85             }
86             $self->{ 'state' }->{ 'sent' }->{ $destination_type }->{ $dst->{ 'path' } } = 1;
87             $self->save_state();
88         }
89     }
90     return;
91 }
92
93 =head1 segment_already_sent()
94
95 Simple function, that checks if segment has been already sent to given destination, and if yes - logs such information.
96
97 =cut
98
99 sub segment_already_sent {
100     my $self = shift;
101     my ( $type, $dst ) = @_;
102     return unless $self->{ 'state' }->{ 'sent' }->{ $type };
103     return unless $self->{ 'state' }->{ 'sent' }->{ $type }->{ $dst->{ 'path' } };
104     $self->log->log( 'Segment already sent to %s. Skipping.', $dst->{ 'path' } );
105     return 1;
106 }
107
108 =head1 cleanup()
109
110 Function is called only if segment has been successfully compressed and sent to all destinations.
111
112 It basically removes tempdir with compressed copies of segment, and state file for given segment.
113
114 =cut
115
116 sub cleanup {
117     my $self = shift;
118     rmtree( $self->{ 'temp-dir' } );
119     unlink $self->{ 'state-file' } if $self->{ 'state-file' };
120     return;
121 }
122
123 =head1 make_all_necessary_compressions()
124
125 Wraps all work required to compress segment to all necessary formats.
126
127 Call to actuall compressor has to be done via "bash -c" to be able to easily use run_command() function which has side benefits of getting stdout, stderr, and proper fetching error codes.
128
129 Overhead of additional fork+exec for bash should be negligible.
130
131 =cut
132
133 sub make_all_necessary_compressions {
134     my $self = shift;
135     $self->get_list_of_all_necessary_compressions();
136
137     for my $compression ( @{ $self->{ 'compressions' } } ) {
138         next if 'none' eq $compression;
139         next if $self->segment_already_compressed( $compression );
140
141         my $compressed_filename = $self->get_temp_filename_for( $compression );
142
143         my $compressor_binary = $self->{ $compression . '-path' } || $compression;
144
145         my $compression_command = sprintf 'nice %s --stdout %s > %s', $compressor_binary, quotemeta( $self->{ 'segment' } ), quotemeta( $compressed_filename );
146
147         $self->log->time_start( 'Compressing with ' . $compression ) if $self->verbose;
148         my $response = run_command( $self->{ 'temp-dir' }, 'bash', '-c', $compression_command );
149         $self->log->time_finish( 'Compressing with ' . $compression ) if $self->verbose;
150
151         if ( $response->{ 'error_code' } ) {
152             $self->log->fatal( 'Error while compressing with %s : %s', $compression, $response );
153         }
154
155         $self->{ 'state' }->{ 'compressed' }->{ $compression } = file_md5sum( $compressed_filename );
156         $self->save_state();
157     }
158     return;
159 }
160
161 =head1 segment_already_compressed()
162
163 Helper function which checks if segment has been already compressed.
164
165 It uses state file, and checks compressed file md5sum to be sure that the file wasn't damaged between prior run and now.
166
167 =cut
168
169 sub segment_already_compressed {
170     my $self = shift;
171     my $type = shift;
172     return unless $self->{ 'state' }->{ 'compressed' }->{ $type };
173     my $want_md5 = $self->{ 'state' }->{ 'compressed' }->{ $type };
174
175     my $temp_file_name = $self->get_temp_filename_for( $type );
176     return unless -e $temp_file_name;
177
178     my $has_md5 = file_md5sum( $temp_file_name );
179     if ( $has_md5 eq $want_md5 ) {
180         $self->log->log( 'Segment has been already compressed with %s.', $type );
181         return 1;
182     }
183
184     unlink $temp_file_name;
185     $self->log->error( 'Segment already compressed to %s, but with bad MD5 (file: %s, state: %s), recompressing.', $type, $has_md5, $want_md5 );
186
187     return;
188 }
189
190 =head1 get_temp_filename_for()
191
192 Helper function to build full (with path) filename for compressed segment, assuming given compression.
193
194 =cut
195
196 sub get_temp_filename_for {
197     my $self = shift;
198     my $type = shift;
199
200     return File::Spec->catfile( $self->{ 'temp-dir' }, basename( $self->{ 'segment' } ) . ext_for_compression( $type ) );
201 }
202
203 =head1 read_state()
204
205 Helper function to read state from state file.
206
207 Name of state file is the same as filename of WAL segment being archived, but it is in state-dir.
208
209 =cut
210
211 sub read_state {
212     my $self = shift;
213     $self->{ 'state' } = {};
214
215     return unless $self->{ 'state-dir' };
216
217     $self->{ 'state-file' } = File::Spec->catfile( $self->{ 'state-dir' }, basename( $self->{ 'segment' } ) );
218     return unless -f $self->{ 'state-file' };
219     $self->{ 'state' } = retrieve( $self->{ 'state-file' } );
220     return;
221 }
222
223 =head1 save_state()
224
225 Helper function to save state to state-file.
226
227 =cut
228
229 sub save_state {
230     my $self = shift;
231
232     return unless $self->{ 'state-file' };
233
234     store( $self->{ 'state' }, $self->{ 'state-file' } );
235
236     return;
237 }
238
239 =head1 read_args()
240
241 Function which does all the parsing, and transformation of command line arguments.
242
243 It also verified base facts about passed WAL segment name, but all other validations, are being done in separate function: L<validate_args()>.
244
245 =cut
246
247 sub read_args {
248     my $self = shift;
249
250     my @argv_copy = @ARGV;
251
252     my %args = (
253         'data-dir'   => '.',
254         'temp-dir'   => $ENV{ 'TMPDIR' } || '/tmp',
255         'gzip-path'  => 'gzip',
256         'bzip2-path' => 'bzip2',
257         'lzma-path'  => 'lzma',
258         'rsync-path' => 'rsync',
259     );
260
261     croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-archive.pod' )
262         unless GetOptions(
263         \%args,
264         'bzip2-path|bp=s',
265         'data-dir|D=s',
266         'dst-backup|db=s',
267         'dst-local|dl=s@',
268         'dst-remote|dr=s@',
269         'force-data-dir|f',
270         'gzip-path|gp=s',
271         'log|l=s',
272         'lzma-path|lp=s',
273         'rsync-path|rp=s',
274         'pid-file=s',
275         'state-dir|s=s',
276         'temp-dir|t=s',
277         'verbose|v',
278         );
279
280     croak( '--log was not provided - cannot continue.' ) unless $args{ 'log' };
281     $args{ 'log' } =~ tr/^/%/;
282
283     for my $key ( qw( data-dir dst-backup temp-dir state-dir pid-file verbose gzip-path bzip2-path lzma-path force-data-dir rsync-path ) ) {
284         $self->{ $key } = $args{ $key };
285     }
286
287     for my $type ( qw( local remote ) ) {
288         my $D = [];
289         $self->{ 'destination' }->{ $type } = $D;
290
291         next unless defined $args{ 'dst-' . $type };
292
293         my %temp_for_uniq = ();
294         my @items = grep { !$temp_for_uniq{ $_ }++ } @{ $args{ 'dst-' . $type } };
295
296         for my $item ( @items ) {
297             my $current = { 'compression' => 'none', };
298             if ( $item =~ s/\A(gzip|bzip2|lzma)=// ) {
299                 $current->{ 'compression' } = $1;
300             }
301             $current->{ 'path' } = $item;
302             push @{ $D }, $current;
303         }
304     }
305
306     # We do it here so it will actually work for reporing problems in validation
307     $self->{ 'log_template' } = $args{ 'log' };
308     $self->{ 'log' }          = OmniPITR::Log->new( $self->{ 'log_template' } );
309
310     # These could theoretically go into validation, but we need to check if we can get anything to {'segment'}
311     $self->log->fatal( 'WAL segment file name has not been given' ) if 0 == scalar @ARGV;
312     $self->log->fatal( 'More than 1 WAL segment file name has been given' ) if 1 < scalar @ARGV;
313
314     $self->{ 'segment' } = shift @ARGV;
315
316     $self->log->log( 'Called with parameters: %s', join( ' ', @argv_copy ) ) if $self->verbose;
317
318     return;
319 }
320
321 =head1 validate_args()
322
323 Does all necessary validation of given command line arguments.
324
325 One exception is for compression programs paths - technically, it could be validated in here, but benefit would be pretty limited, and code to do so relatively complex, as compression program path
326 might, but doesn't have to be actual file path - it might be just program name (without path), which is the default.
327
328 =cut
329
330 sub validate_args {
331     my $self = shift;
332
333     unless ( $self->{ 'force-data-dir' } ) {
334         $self->log->fatal( "Given data-dir (%s) is not valid", $self->{ 'data-dir' } ) unless -d $self->{ 'data-dir' } && -f File::Spec->catfile( $self->{ 'data-dir' }, 'PG_VERSION' );
335     }
336
337     if ( $self->{ 'dst-backup' } ) {
338         if ( $self->{ 'dst-backup' } =~ m{\A(gzip|bzip2|lzma)=} ) {
339             $self->log->fatal( 'dst-backup cannot be compressed! [%]', $self->{ 'dst-backup' } );
340         }
341         unless ( $self->{ 'dst-backup' } =~ m{\A/} ) {
342             $self->log->fatal( 'dst-backup has to be absolute path, and it is not: %s', $self->{ 'dst-backup' } );
343         }
344         if ( -e $self->{ 'dst-backup' } ) {
345             push @{ $self->{ 'destination' }->{ 'local' } },
346                 {
347                 'compression' => 'none',
348                 'path'        => $self->{ 'dst-backup' },
349                 };
350         }
351     }
352
353     my $dst_count = scalar( @{ $self->{ 'destination' }->{ 'local' } } ) + scalar( @{ $self->{ 'destination' }->{ 'remote' } } );
354     $self->log->fatal( "No --dst-* has been provided!" ) if 0 == $dst_count;
355
356     if ( 1 < $dst_count ) {
357         $self->log->fatal( "More than 1 --dst-* has been provided, but no --state-dir!" ) if !$self->{ 'state-dir' };
358         $self->log->fatal( "Given --state-dir (%s) does not exist",     $self->{ 'state-dir' } ) unless -e $self->{ 'state-dir' };
359         $self->log->fatal( "Given --state-dir (%s) is not a directory", $self->{ 'state-dir' } ) unless -d $self->{ 'state-dir' };
360         $self->log->fatal( "Given --state-dir (%s) is not writable",    $self->{ 'state-dir' } ) unless -w $self->{ 'state-dir' };
361     }
362
363     $self->log->fatal( 'Given segment name is not valid (%s)', $self->{ 'segment' } )
364         unless basename( $self->{ 'segment' } ) =~ m{\A(?:[a-fA-F0-9]{24}(?:\.[a-fA-F0-9]{8}\.backup)?|[a-fA-F0-9]{8}\.history)\z};
365     my $segment_file_name = $self->{ 'segment' };
366     $segment_file_name = File::Spec->catfile( $self->{ 'data-dir' }, $self->{ 'segment' } ) unless $self->{ 'segment' } =~ m{^/};
367
368     $self->log->fatal( 'Given segment (%s) does not exist.',  $segment_file_name ) unless -e $segment_file_name;
369     $self->log->fatal( 'Given segment (%s) is not a file.',   $segment_file_name ) unless -f $segment_file_name;
370     $self->log->fatal( 'Given segment (%s) is not readable.', $segment_file_name ) unless -r $segment_file_name;
371
372     if ( $self->{ 'segment' } =~ m{\A[a-fA-F0-9]{24}\z} ) {
373         my $expected_size = 256**3;
374         my $file_size     = ( -s $segment_file_name );
375         $self->log->fatal( 'Given segment (%s) has incorrect size (%u vs %u).', $segment_file_name, $file_size, $expected_size ) unless $expected_size == $file_size;
376     }
377
378     $self->{ 'segment' } = $segment_file_name;
379     return;
380 }
381
382 1;
Note: See TracBrowser for help on using the browser.