root/trunk/omnipitr/lib/OmniPITR/Program/Archive.pm

Revision 106, 13.4 kB (checked in by depesz, 4 years ago)

allow uppercase hex digits in segment names

Line 
1 package OmniPITR::Program::Archive;
2 use strict;
3 use warnings;
4
5 use base qw( OmniPITR::Program );
6
7 use Carp;
8 use OmniPITR::Tools qw( :all );
9 use English qw( -no_match_vars );
10 use File::Basename;
11 use File::Spec;
12 use File::Path qw( make_path remove_tree );
13 use File::Copy;
14 use Storable;
15 use Getopt::Long;
16
17 =head1 run()
18
19 Main function, called by actual script in bin/, wraps all work done by script with the sole exception of reading and validating command line arguments.
20
21 These tasks (reading and validating arguments) are in this module, but they are called from L<OmniPITR::Program::new()>
22
23 Name of called method should be self explanatory, and if you need further information - simply check doc for the method you have questions about.
24
25 =cut
26
27 sub run {
28     my $self = shift;
29     $self->read_state();
30     $self->prepare_temp_directory();
31     $self->make_all_necessary_compressions();
32     $self->send_to_destinations();
33     $self->cleanup();
34     $self->log->log( 'Segment %s successfully sent to all destinations.', $self->{ 'segment' } );
35     return;
36 }
37
38 =head1 send_to_destinations()
39
40 Does all the actual sending of segments to local and remote destinations.
41
42 It keeps it's state to be able to continue in case of error.
43
44 Since both local and remote destinations are handled in the same way, there is no point in duplicating the code to 2 methods.
45
46 Important notice - this function has to have the ability to choose whether to use temp file (for compressed destinations), or original segment (for uncompressed ones). This is done by this line:
47
48     my $local_file = $dst->{ 'compression' } eq 'none' ? $self->{ 'segment' } : $self->get_temp_filename_for( $dst->{ 'compression' } );
49
50 =cut
51
52 sub send_to_destinations {
53     my $self = shift;
54
55     for my $destination_type ( qw( local remote ) ) {
56         next unless my $dst_list = $self->{ 'destination' }->{ $destination_type };
57         for my $dst ( @{ $dst_list } ) {
58             next if $self->segment_already_sent( $destination_type, $dst );
59
60             my $local_file = $dst->{ 'compression' } eq 'none' ? $self->{ 'segment' } : $self->get_temp_filename_for( $dst->{ 'compression' } );
61
62             my $destination_file_path = $dst->{ 'path' };
63             $destination_file_path =~ s{/*\z}{};
64             $destination_file_path .= '/' . basename( $local_file );
65
66             my $comment = 'Sending ' . $local_file . ' to ' . $destination_file_path;
67
68             $self->log->time_start( $comment ) if $self->{ 'verbose' };
69             my $response = run_command( $self->{ 'temp-dir' }, 'rsync', $local_file, $destination_file_path );
70             $self->log->time_finish( $comment ) if $self->{ 'verbose' };
71
72             if ( $response->{ 'error_code' } ) {
73                 $self->log->fatal( "Cannot send segment %s to %s : %s", $local_file, $destination_file_path, $response );
74             }
75             $self->{ 'state' }->{ 'sent' }->{ $destination_type }->{ $dst->{ 'path' } } = 1;
76             $self->save_state();
77         }
78     }
79     return;
80 }
81
82 =head1 segment_already_sent()
83
84 Simple function, that checks if segment has been already sent to given destination, and if yes - logs such information.
85
86 =cut
87
88 sub segment_already_sent {
89     my $self = shift;
90     my ( $type, $dst ) = @_;
91     return unless $self->{ 'state' }->{ 'sent' }->{ $type };
92     return unless $self->{ 'state' }->{ 'sent' }->{ $type }->{ $dst->{ 'path' } };
93     $self->log->log( 'Segment already sent to %s. Skipping.', $dst->{ 'path' } );
94     return 1;
95 }
96
97 =head1 cleanup()
98
99 Function is called only if segment has been successfully compressed and sent to all destinations.
100
101 It basically removes tempdir with compressed copies of segment, and state file for given segment.
102
103 =cut
104
105 sub cleanup {
106     my $self = shift;
107     remove_tree( $self->{ 'temp-dir' } );
108     unlink $self->{ 'state-file' } if $self->{ 'state-file' };
109     return;
110 }
111
112 =head1 get_list_of_all_necessary_compressions()
113
114 Scans list of destinations, and gathers list of all compressions that have to be made.
115
116 This is to be able to compress file only once even when having multiple destinations that require compressed format.
117
118 =cut
119
120 sub get_list_of_all_necessary_compressions {
121     my $self = shift;
122
123     my %compression = ();
124
125     for my $dst_type ( qw( local remote ) ) {
126         next unless my $dsts = $self->{ 'destination' }->{ $dst_type };
127         for my $destination ( @{ $dsts } ) {
128             $compression{ $destination->{ 'compression' } } = 1;
129         }
130     }
131     $self->{ 'compressions' } = [ grep { $_ ne 'none' } keys %compression ];
132     return;
133 }
134
135 =head1 make_all_necessary_compressions()
136
137 Wraps all work required to compress segment to all necessary formats.
138
139 Call to actuall compressor has to be done via "bash -c" to be able to easily use run_command() function which has side benefits of getting stdout, stderr, and proper fetching error codes.
140
141 Overhead of additional fork+exec for bash should be negligible.
142
143 =cut
144
145 sub make_all_necessary_compressions {
146     my $self = shift;
147     $self->get_list_of_all_necessary_compressions();
148
149     for my $compression ( @{ $self->{ 'compressions' } } ) {
150         next if $self->segment_already_compressed( $compression );
151         my $compressed_filename = $self->get_temp_filename_for( $compression );
152
153         my $compressor_binary = $self->{ $compression . '-path' } || $compression;
154
155         my $compression_command = sprintf 'nice %s --stdout %s > %s', $compressor_binary, quotemeta( $self->{ 'segment' } ), quotemeta( $compressed_filename );
156
157         $self->log->time_start( 'Compressing with ' . $compression ) if $self->{ 'verbose' };
158         my $response = run_command( $self->{ 'temp-dir' }, 'bash', '-c', $compression_command );
159         $self->log->time_finish( 'Compressing with ' . $compression ) if $self->{ 'verbose' };
160
161         if ( $response->{ 'error_code' } ) {
162             $self->log->fatal( 'Error while compressing with %s : %s', $compression, Dumper( $response ) );
163         }
164
165         $self->{ 'state' }->{ 'compressed' }->{ $compression } = file_md5sum( $compressed_filename );
166         $self->save_state();
167     }
168     return;
169 }
170
171 =head1 segment_already_compressed()
172
173 Helper function which checks if segment has been already compressed.
174
175 It uses state file, and checks compressed file md5sum to be sure that the file wasn't damaged between prior run and now.
176
177 =cut
178
179 sub segment_already_compressed {
180     my $self = shift;
181     my $type = shift;
182     return unless $self->{ 'state' }->{ 'compressed' }->{ $type };
183     my $want_md5 = $self->{ 'state' }->{ 'compressed' }->{ $type };
184
185     my $temp_file_name = $self->get_temp_filename_for( $type );
186     return unless -e $temp_file_name;
187
188     my $has_md5 = file_md5sum( $temp_file_name );
189     if ( $has_md5 eq $want_md5 ) {
190         $self->log->log( 'Segment has been already compressed with %s.', $type );
191         return 1;
192     }
193
194     unlink $temp_file_name;
195     $self->log->error( 'Segment already compressed to %s, but with bad MD5 (file: %s, state: %s), recompressing.', $type, $has_md5, $want_md5 );
196
197     return;
198 }
199
200 =head1 get_temp_filename_for()
201
202 Helper function to build full (with path) filename for compressed segment, assuming given compression.
203
204 =cut
205
206 sub get_temp_filename_for {
207     my $self = shift;
208     my $type = shift;
209
210     return File::Spec->catfile( $self->{ 'temp-dir' }, basename( $self->{ 'segment' } ) . ext_for_compression( $type ) );
211 }
212
213 =head1 prepare_temp_directory()
214
215 Helper function, which builds path for temp directory, and creates it.
216
217 Path is generated by using given temp-dir, 'omnipitr-archive' name, and filename of segment.
218
219 For example, for temp-dir '/tmp' and segment being pg_xlog/000000010000000000000003, actual, used temp directory would be /tmp/omnipitr-archive/000000010000000000000003/.
220
221 =cut
222
223 sub prepare_temp_directory {
224     my $self = shift;
225     my $full_temp_dir = File::Spec->catfile( $self->{ 'temp-dir' }, basename( $PROGRAM_NAME ), basename( $self->{ 'segment' } ) );
226     make_path( $full_temp_dir );
227     $self->{ 'temp-dir' } = $full_temp_dir;
228     return;
229 }
230
231 =head1 read_state()
232
233 Helper function to read state from state file.
234
235 Name of state file is the same as filename of WAL segment being archived, but it is in state-dir.
236
237 =cut
238
239 sub read_state {
240     my $self = shift;
241     $self->{ 'state' } = {};
242
243     return unless $self->{ 'state-dir' };
244
245     $self->{ 'state-file' } = File::Spec->catfile( $self->{ 'state-dir' }, basename( $self->{ 'segment' } ) );
246     return unless -f $self->{ 'state-file' };
247     $self->{ 'state' } = retrieve( $self->{ 'state-file' } );
248     return;
249 }
250
251 =head1 save_state()
252
253 Helper function to save state to state-file.
254
255 =cut
256
257 sub save_state {
258     my $self = shift;
259
260     return unless $self->{ 'state-file' };
261
262     store( $self->{ 'state' }, $self->{ 'state-file' } );
263
264     return;
265 }
266
267 =head1 read_args()
268
269 Function which does all the parsing, and transformation of command line arguments.
270
271 It also verified base facts about passed WAL segment name, but all other validations, are being done in separate function: L<validate_args()>.
272
273 =cut
274
275 sub read_args {
276     my $self = shift;
277
278     my @argv_copy = @ARGV;
279
280     my %args = (
281         'data-dir'   => '.',
282         'temp-dir'   => $ENV{ 'TMPDIR' } || '/tmp',
283         'gzip-path'  => 'gzip',
284         'bzip2-path' => 'bzip2',
285         'lzma-path'  => 'lzma',
286     );
287
288     croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-archive.pod' )
289         unless GetOptions(
290         \%args,
291         'bzip2-path|bp=s',
292         'data-dir|D=s',
293         'dst-local|dl=s@',
294         'dst-remote|dr=s@',
295         'force-data-dir|f',
296         'gzip-path|gp=s',
297         'log|l=s',
298         'lzma-path|lp=s',
299         'pid-file=s',
300         'state-dir|s=s',
301         'temp-dir|t=s',
302         'verbose|v',
303         );
304
305     croak( '--log was not provided - cannot continue.' ) unless $args{ 'log' };
306     $args{ 'log' } =~ tr/^/%/;
307
308     for my $key ( qw( data-dir temp-dir state-dir pid-file verbose gzip-path bzip2-path lzma-path ) ) {
309         $self->{ $key } = $args{ $key };
310     }
311
312     for my $type ( qw( local remote ) ) {
313         my $D = [];
314         $self->{ 'destination' }->{ $type } = $D;
315
316         next unless defined $args{ 'dst-' . $type };
317
318         my %temp_for_uniq = ();
319         my @items = grep { !$temp_for_uniq{ $_ }++ } @{ $args{ 'dst-' . $type } };
320
321         for my $item ( @items ) {
322             my $current = { 'compression' => 'none', };
323             if ( $item =~ s/\A(gzip|bzip2|lzma)=// ) {
324                 $current->{ 'compression' } = $1;
325             }
326             $current->{ 'path' } = $item;
327             push @{ $D }, $current;
328         }
329     }
330
331     # We do it here so it will actually work for reporing problems in validation
332     $self->{ 'log_template' } = $args{ 'log' };
333     $self->{ 'log' }          = OmniPITR::Log->new( $self->{ 'log_template' } );
334
335     # These could theoretically go into validation, but we need to check if we can get anything to {'segment'}
336     $self->log->fatal( 'WAL segment file name has not been given' ) if 0 == scalar @ARGV;
337     $self->log->fatal( 'More than 1 WAL segment file name has been given' ) if 1 < scalar @ARGV;
338
339     $self->{ 'segment' } = shift @ARGV;
340
341     $self->log->log( 'Called with parameters: %s', join( ' ', @argv_copy ) ) if $self->{ 'verbose' };
342
343     return;
344 }
345
346 =head1 validate_args()
347
348 Does all necessary validation of given command line arguments.
349
350 One exception is for compression programs paths - technically, it could be validated in here, but benefit would be pretty limited, and code to do so relatively complex, as compression program path
351 might, but doesn't have to be actual file path - it might be just program name (without path), which is the default.
352
353 =cut
354
355 sub validate_args {
356     my $self = shift;
357
358     unless ( $self->{ 'force-data-dir' } ) {
359         $self->log->fatal( "Given data-dir (%s) is not valid", $self->{ 'data-dir' } ) unless -d $self->{ 'data-dir' } && -f File::Spec->catfile( $self->{ 'data-dir' }, 'PG_VERSION' );
360     }
361
362     my $dst_count = scalar( @{ $self->{ 'destination' }->{ 'local' } } ) + scalar( @{ $self->{ 'destination' }->{ 'remote' } } );
363     $self->log->fatal( "No --dst-* has been provided!" ) if 0 == $dst_count;
364
365     if ( 1 < $dst_count ) {
366         $self->log->fatal( "More than 1 --dst-* has been provided, but no --state-dir!" ) if !$self->{ 'state-dir' };
367         $self->log->fatal( "Given --state-dir (%s) does not exist",     $self->{ 'state-dir' } ) unless -e $self->{ 'state-dir' };
368         $self->log->fatal( "Given --state-dir (%s) is not a directory", $self->{ 'state-dir' } ) unless -d $self->{ 'state-dir' };
369         $self->log->fatal( "Given --state-dir (%s) is not writable",    $self->{ 'state-dir' } ) unless -w $self->{ 'state-dir' };
370     }
371
372     $self->log->fatal( 'Given segment name is not valid (%s)', $self->{ 'segment' } ) unless basename( $self->{ 'segment' } ) =~ m{\A[a-fA-F0-9]{24}\z};
373     my $segment_file_name = $self->{ 'segment' };
374     $segment_file_name = File::Spec->catfile( $self->{ 'data-dir' }, $self->{ 'segment' } ) unless $self->{ 'segment' } =~ m{^/};
375
376     $self->log->fatal( 'Given segment (%s) does not exist.',  $segment_file_name ) unless -e $segment_file_name;
377     $self->log->fatal( 'Given segment (%s) is not a file.',   $segment_file_name ) unless -f $segment_file_name;
378     $self->log->fatal( 'Given segment (%s) is not readable.', $segment_file_name ) unless -r $segment_file_name;
379
380     my $expected_size = 256**3;
381     my $file_size     = ( -s $segment_file_name );
382     $self->log->fatal( 'Given segment (%s) has incorrect size (%u vs %u).', $segment_file_name, $file_size, $expected_size ) unless $expected_size == $file_size;
383
384     $self->{ 'segment' } = $segment_file_name;
385     return;
386 }
387
388 1;
Note: See TracBrowser for help on using the browser.