root/trunk/tools/fast.dump.and.restore/fast.dump

Revision 248, 18.3 kB (checked in by depesz, 3 years ago)

more debug info

  • Property svn:executable set to *
Line 
1 #!/usr/bin/env perl
2 package main;
3 use strict;
4 use warnings;
5 my $program = Omni::Program::Pg::FastDump->new();
6 $program->run();
7 exit;
8
9 package Omni::Program::Pg::FastDump;
10 use strict;
11 use warnings;
12 use Carp qw( croak carp );
13 use English qw( -no_match_vars );
14 use Getopt::Long qw( :config no_ignore_case );
15 use Data::Dumper;
16 use Cwd qw( abs_path );
17 use Pod::Usage;
18 use POSIX qw( :sys_wait_h );
19 use File::Spec;
20 use File::Temp qw( tempfile );
21
22 our %killed_pids = ();
23
24 sub REAPER {
25     my $child;
26     while ( ( $child = waitpid( -1, WNOHANG ) ) > 0 ) {
27         $killed_pids{ $child } = time();
28     }
29     $SIG{ 'CHLD' } = \&REAPER;
30     return;
31 }
32
33 sub new {
34     my $class = shift;
35     my $self  = {};
36     bless $self, $class;
37     return $self;
38 }
39
40 sub run {
41     my $self = shift;
42
43     $self->read_options();
44     $self->show_running_details();
45     $self->confirm_work();
46     $self->make_dump();
47     return;
48 }
49
50 sub make_dump {
51     my $self = shift;
52     $self->dump_schema();
53     $self->get_list_of_tables();
54     $self->split_tables_into_blobs();
55     $self->order_blobs();
56     $self->launch_dumpers();
57     return;
58 }
59
60 sub launch_dumpers {
61     my $self = shift;
62     $OUTPUT_AUTOFLUSH = 1;
63     $SIG{ 'CHLD' } = \&REAPER;
64
65     my %c = map { ( $_ => 0 ) } qw( total partial full );
66     for my $t ( @{ $self->{ 'blobs' } } ) {
67         $c{ 'total' }++;
68         $c{ $t->{ 'blob_type' } }++;
69     }
70
71     open my $fh, ">", File::Spec->catfile( $self->{ 'output' }, 'index.lst' ) or croak( "Cannot create index file: $OS_ERROR\n" );
72     printf $fh '%-5s | %-7s | %-32s | %-32s | %s%s', qw( # type schema table condition ), "\n";
73     for my $i ( @{ $self->{'blobs'} } ) {
74         printf $fh '%5d | %-7s | %-32s | %-32s | %s%s', @{ $i }{ qw( id blob_type schema table ) }, ( $i->{'condition'} || '' ), "\n";
75     }
76
77     printf "%d blobs to be processed. %d full and %d partial.\n", @c{ qw( total full partial ) };
78     my %running_kids = ();
79     while ( 1 ) {
80         my @pids = keys %killed_pids;
81         for my $killed ( @pids ) {
82             my $blob     = delete $running_kids{ $killed };
83             my $end_time = delete $killed_pids{ $killed };
84             printf $fh "%s dump (#%d) of %s.%s finished after %d seconds.\n", $blob->{ 'blob_type' }, $blob->{ 'id' }, $blob->{ 'schema' }, $blob->{ 'table' }, $end_time - $blob->{ 'started' };
85         }
86         while ( $self->{ 'jobs' } > scalar keys %running_kids ) {
87             last if 0 == scalar @{ $self->{ 'blobs' } };
88             my $blob = shift @{ $self->{ 'blobs' } };
89             my $pid  = fork();
90             croak "cannot fork" unless defined $pid;
91             if ( $pid == 0 ) {
92
93                 # It's worker process.
94                 delete $SIG{ 'CHLD' };
95                 $self->make_single_blob( $blob );
96                 exit;
97             }
98
99             # It's master.
100             $blob->{ 'started' } = time();
101             $running_kids{ $pid } = $blob;
102         }
103
104         my %what_runs = map { ( $_ => 0 ) } qw( total partial full );
105         for my $t ( values %running_kids ) {
106             $what_runs{ 'total' }++;
107             $what_runs{ $t->{ 'blob_type' } }++;
108         }
109         my %what_waits = map { ( $_ => 0 ) } qw( total partial full );
110         for my $t ( @{ $self->{ 'blobs' } } ) {
111             $what_waits{ 'total' }++;
112             $what_waits{ $t->{ 'blob_type' } }++;
113         }
114         printf 'Running: %3d workers (%3d full, %5d partial). Pending: %3d blobs (%3d full, %5d partial).%s',
115             $what_runs{ 'total' },
116             $what_runs{ 'full' },
117             $what_runs{ 'partial' },
118             $what_waits{ 'total' }, $what_waits{ 'full' }, $what_waits{ 'partial' }, "     \r",    # just some spaces to clean leftover chars, and rewind to beginning of line
119             ;
120         last if ( 0 == $what_runs{ 'total' } ) && ( 0 == $what_waits{ 'total' } );
121         sleep 60;                                                                                  # sleep will get interrupted when child exits, and then the loop will repeat.
122     }
123     printf '%sAll done.%s', "\n", "\n";
124     return;
125 }
126
127 sub make_single_blob {
128     my $self = shift;
129     my $blob = shift;
130
131     my $file_name = $self->get_file_name( $blob );
132     $PROGRAM_NAME .= ' ... ' . $file_name;
133
134     my $output_path = File::Spec->catfile( $self->{ 'output' }, $file_name );
135     my $sql_filename = $output_path . '.sql';
136     open my $sql_fh, '>', $sql_filename or croak( "Cannot write to $sql_filename: $OS_ERROR\n" );
137
138     if ( $blob->{ 'blob_type' } eq 'partial' ) {
139         print $sql_fh "set enable_seqscan = false;\n";
140         printf $sql_fh 'COPY (SELECT * FROM %s WHERE %s) TO stdout;%s', $blob->{ 'full_name' }, $blob->{ 'condition' }, "\n";
141         $PROGRAM_NAME = 'Partial dump of ' . $blob->{ 'full_name' };
142     }
143     else {
144         printf $sql_fh 'COPY %s TO stdout;%s', $blob->{ 'full_name' }, "\n";
145         $PROGRAM_NAME = 'Full dump of ' . $blob->{ 'full_name' };
146     }
147     close $sql_fh;
148
149     my @cmd = ( $self->{ 'psql' }, '-qAtX', '-f', $sql_filename, );
150
151     my $psql_call = join( ' ', map { quotemeta } @cmd );
152     if ( $self->{ 'compressor' } ) {
153         $psql_call .= ' | ' . quotemeta( $self->{ 'compressor' } ) . ' -c -';
154     }
155
156     my ( $stderr_fh, $stderr_filename ) = tempfile( 'fast.dump.XXXXXXXX', 'TMPDIR' => 1, );
157
158     $psql_call .= sprintf ' 2>%s >%s', quotemeta $stderr_filename, quotemeta $output_path;
159
160     system $psql_call;
161     local $/ = undef;
162     my $stderr = <$stderr_fh>;
163
164     close $stderr_fh;
165     unlink( $stderr_filename );
166
167     my $error_code;
168     if ( $CHILD_ERROR == -1 ) {
169         $error_code = $OS_ERROR;
170     }
171     elsif ( $CHILD_ERROR & 127 ) {
172         $error_code = sprintf "child died with signal %d, %s coredump\n", ( $CHILD_ERROR & 127 ), ( $CHILD_ERROR & 128 ) ? 'with' : 'without';
173     }
174     else {
175         $error_code = $CHILD_ERROR >> 8;
176     }
177
178     croak( "\nCouldn't run $psql_call : " . $stderr ) if $error_code;
179     return;
180 }
181
182 sub get_file_name {
183     my $self = shift;
184     my $blob = shift;
185
186     my @output_parts = ( 'data', $blob->{ 'schema' }, $blob->{ 'table' }, $blob->{ 'id' }, 'dump' );
187
188     for my $part ( @output_parts ) {
189         $part =~ s/([^a-zA-Z0-9])/sprintf "_%02x", ord( $1 )/ges;
190     }
191
192     my $output = join '.', @output_parts;
193
194     return $output;
195 }
196
197 sub order_blobs {
198     my $self  = shift;
199     my $i     = 0;
200     my @blobs = map { $_->{ 'id' } = $i++; $_ } sort { $b->{ 'size' } <=> $a->{ 'size' } || $a->{ 'table' } cmp $b->{ 'table' } } @{ $self->{ 'blobs' } };
201     $self->{ 'blobs' } = \@blobs;
202     return;
203 }
204
205 sub split_tables_into_blobs {
206     my $self = shift;
207
208     my @to_split = ();
209     my @blobs    = ();
210     my %oids     = ();
211
212     while ( my ( $schema_name, $tables_hash ) = each %{ $self->{ 'tables' } } ) {
213         while ( my ( $table_name, $table_data ) = each %{ $tables_hash } ) {
214             if ( $table_data->{ 'size' } <= $self->{ 'max-size' } ) {
215                 $table_data->{ 'blob_type' } = 'full';
216                 push @blobs, $table_data;
217                 next;
218             }
219             push @to_split, $table_data;
220             $oids{ $table_data->{ 'oid' } } = $table_data;
221         }
222     }
223     if ( 0 == scalar @to_split ) {
224         $self->{ 'blobs' } = \@blobs;
225         return;
226     }
227     my $oids = join( ',', map { $_->{ 'oid' } } @to_split );
228
229     my $pkey_columns = $self->psql(
230 "select distinct on (i.indrelid) i.indrelid, a.attnum, a.attname, t.typname from pg_index i join pg_attribute a on i.indexrelid = a.attrelid join pg_type t on a.atttypid = t.oid where indrelid = ANY('{$oids}'::oid[]) and indisprimary order by i.indrelid, a.attnum"
231     );
232     croak( "pkey_columns is not arrayref? Something is wrong!" ) unless 'ARRAY' eq ref $pkey_columns;
233     croak( "pkey_columns is not arrayref of arrayrefs? Something is wrong!" ) unless 'ARRAY' eq ref $pkey_columns->[ 0 ];
234
235     my ( $sql_fh, $sql_filename ) = tempfile( 'fast.dump.XXXXXXXX', 'TMPDIR' => 1, );
236
237     for my $row ( @{ $pkey_columns } ) {
238         $oids{ $row->[ 0 ] }->{ 'partition_key' }    = $row->[ 2 ];
239         $oids{ $row->[ 0 ] }->{ 'partition_values' } = [];
240         my $sql_query = sprintf q{
241             SELECT
242                 s2.starelid,
243                 quote_literal(s2.vals[i])
244             FROM
245                 (
246                     SELECT
247                         s1.*,
248                         generate_series( array_lower( s1.vals, 1 ), array_upper( s1.vals, 1 ) ) as i
249                     FROM
250                         (
251                             SELECT
252                                 s.starelid,
253                                 case
254                                 when s.stakind1 = 2 THEN stavalues1
255                                 when s.stakind2 = 2 THEN stavalues2
256                                 when s.stakind3 = 2 THEN stavalues3
257                                 when s.stakind4 = 2 THEN stavalues4
258                                 ELSE NULL
259                                 END::TEXT::%s[] as vals
260                             FROM
261                                 pg_statistic s
262                             WHERE
263                                 s.starelid = %d
264                                 AND staattnum = %d
265                                 AND ( 2 = s.stakind1 OR 2 = s.stakind2 OR 2 = s.stakind3 OR 2=s.stakind4)
266                         ) as s1
267                 ) as s2
268             ORDER BY s2.starelid, s2.vals[i];%s}, @{ $row }[ 3, 0, 1 ], "\n";
269         print $sql_fh $sql_query;
270     }
271     close $sql_fh;
272
273     my $partitions = $self->psql( '\i ' . $sql_filename );
274     unlink $sql_filename;
275
276     for my $row ( @{ $partitions } ) {
277         push @{ $oids{ $row->[ 0 ] }->{ 'partition_values' } }, $row->[ 1 ];
278     }
279
280     for my $table ( @to_split ) {
281         if ( !defined $table->{ 'partition_key' } ) {
282             $table->{ 'blob_type' } = 'full';
283             push @blobs, $table;
284             next;
285         }
286         for my $i ( 0 .. $#{ $table->{ 'partition_values' } } ) {
287             my $blob = {};
288             @{ $blob }{ keys %{ $table } } = ( values %{ $table } );
289             delete $blob->{ 'partition_key' };
290             delete $blob->{ 'partition_values' };
291             $blob->{ 'blob_type' } = 'partial';
292             if ( $i == 0 ) {
293                 $blob->{ 'condition' } = sprintf "%s < %s", $table->{ 'partition_key' }, $table->{ 'partition_values' }->[ $i ];
294                 $blob->{ 'size' } = 0;
295             }
296             else {
297                 $blob->{ 'condition' } = sprintf "%s >= %s and %s < %s", $table->{ 'partition_key' }, $table->{ 'partition_values' }->[ $i - 1 ], $table->{ 'partition_key' },
298                     $table->{ 'partition_values' }->[ $i ];
299                 $blob->{ 'size' } /= ( scalar( @{ $table->{ 'partition_values' } } ) - 1 );
300             }
301             push @blobs, $blob;
302         }
303         $table->{ 'blob_type' } = 'partial';
304         $table->{ 'size' } /= ( scalar( @{ $table->{ 'partition_values' } } ) - 1 );
305         $table->{ 'condition' } = sprintf "%s >= %s", $table->{ 'partition_key' }, $table->{ 'partition_values' }->[ -1 ];
306         delete $table->{ 'partition_key' };
307         delete $table->{ 'partition_values' };
308         push @blobs, $table;
309     }
310     $self->{ 'blobs' } = \@blobs;
311     delete $self->{ 'tables' };
312     return;
313 }
314
315 sub get_list_of_tables {
316     my $self = shift;
317
318     my $restored = $self->run_command( $self->{ 'pg_restore' }, '-l', File::Spec->catfile( $self->{ 'output' }, 'schema.dump' ) );
319
320     my @lines = split /\r?\n/, $restored;
321     my %tables = ();
322     for my $line ( @lines ) {
323         next unless $line =~ m{\A\d+;\s+\d+\s+\d+\s+TABLE\s+(\S+)\s+(\S+)\s+};
324         $tables{ $1 }->{ $2 } = { 'schema' => $1, 'table' => $2, };
325     }
326     if ( 0 == scalar keys %tables ) {
327         print "This dump doesn't contain any tables.\n";
328         exit 0;
329     }
330
331     my $db_sizes = $self->psql(
332 "SELECT n.nspname, c.relname, c.oid::regclass, c.oid, cast( pg_relation_size(c.oid) / ( 1024 * 1024 ) as int8 ) from pg_class c join pg_namespace n on c.relnamespace = n.oid where c.relkind = 'r'"
333     );
334
335     @lines = split /\r?\n/, $db_sizes;
336     for my $row ( @{ $db_sizes } ) {
337         my ( $schema, $table, $full_name, $oid, $size ) = @{ $row };
338         next unless exists $tables{ $schema };
339         next unless exists $tables{ $schema }->{ $table };
340         $tables{ $schema }->{ $table }->{ 'full_name' } = $full_name;
341         $tables{ $schema }->{ $table }->{ 'size' }      = $size;
342         $tables{ $schema }->{ $table }->{ 'oid' }       = $oid;
343     }
344
345     $self->{ 'tables' } = \%tables;
346     return;
347 }
348
349 sub dump_schema {
350     my $self = shift;
351     $self->run_command( $self->{ 'pg_dump' }, '-Fc', '-f', File::Spec->catfile( $self->{ 'output' }, 'schema.dump' ), '-s', '-v' );
352     return;
353 }
354
355 sub confirm_work {
356     my $self = shift;
357     printf "\n\nAre you sure you want to continue?\n";
358     printf "Please remember that any other ( aside from $PROGRAM_NAME ) connections to database can cause dump corruption!\n";
359     printf "Enter YES to continue: ";
360     my $input = <STDIN>;
361     exit unless $input =~ m{\AYES\r?\n?\z};
362     return;
363 }
364
365 sub show_running_details {
366     my $self = shift;
367
368     my $db = $self->psql( 'SELECT current_user, current_database()' );
369
370     my $largest_tables = $self->psql(
371 "SELECT * FROM ( SELECT rpad(oid::regclass::text, 32) || ' (' || pg_size_pretty(pg_relation_size(oid)) || ')' from pg_class where relkind = 'r' and relname !~ '^pg_' order by pg_relation_size(oid) desc limit 5) x order by 1"
372     );
373
374     my @tables = map { $_->[ 0 ] } @{ $largest_tables };
375
376     printf "Config:\n";
377     for my $key ( sort keys %{ $self } ) {
378         printf "%-10s : %s\n", $key, $self->{ $key };
379     }
380
381     printf "\nDatabase details:\n";
382     printf "User          : %s\n", $db->[ 0 ]->[ 0 ];
383     printf "Database      : %s\n", $db->[ 0 ]->[ 1 ];
384     printf "Sample tables : %s\n", shift @tables;
385     printf "              - %s\n", $_ for @tables;
386     return;
387 }
388
389 sub read_options {
390     my $self = shift;
391
392     my $opts = {
393         'psql'       => 'psql',
394         'pg_dump'    => 'pg_dump',
395         'pg_restore' => 'pg_restore',
396         'output'     => '.',
397         'jobs'       => 1,
398         'max-size'   => 10,
399     };
400     my $is_ok = GetOptions( $opts, qw( help|? output|o=s compressor|c=s jobs|j=i max-size|m=i psql|p=s pg_dump|d=s pg_restore|r=s ) );
401     pod2usage( '-verbose' => 1, ) unless $is_ok;
402     pod2usage( '-verbose' => 99, '-sections' => [ qw( DESCRIPTION SYNOPSIS OPTIONS ) ] ) if $opts->{ 'help' };
403
404     pod2usage( '-message' => 'Output directory has to be given.' ) if !$opts->{ 'output' };
405     pod2usage( '-message' => 'Output directory does not exist.' )  if !-e $opts->{ 'output' };
406     pod2usage( '-message' => 'Output is not directory.' )          if !-d $opts->{ 'output' };
407     pod2usage( '-message' => 'Output directory is not writable.' ) if !-w $opts->{ 'output' };
408
409     pod2usage( '-message' => 'Number of jobs has to be not-empty.' ) if '' eq $opts->{ 'jobs' };
410     $opts->{ 'jobs' } = int( $opts->{ 'jobs' } );
411     pod2usage( '-message' => 'Number of jobs cannot be less than 1.' )   if 1 > $opts->{ 'jobs' };
412     pod2usage( '-message' => 'Number of jobs cannot be more than 100.' ) if 100 < $opts->{ 'jobs' };
413
414     pod2usage( '-message' => 'Max-size has to be not-empty.' ) if '' eq $opts->{ 'max-size' };
415     $opts->{ 'max-size' } = int( $opts->{ 'max-size' } );
416     pod2usage( '-message' => 'Max-size cannot be less than 1.' ) if 1 > $opts->{ 'max-size' };
417
418     $opts->{ 'output' } = abs_path( $opts->{ 'output' } );
419     @{ $self }{ keys %{ $opts } } = values %{ $opts };
420     return;
421 }
422
423 sub psql {
424     my $self   = shift;
425     my $query  = shift;
426     my $output = $self->run_command( $self->{ 'psql' }, '-qAtX', '-F', "\t", '-c', $query, );
427     my @rows   = grep { '' ne $_ } split /\r?\n/, $output;
428     my @data   = map { [ split /\t/, $_ ] } @rows;
429     return \@data;
430 }
431
432 sub run_command {
433     my $self = shift;
434     my ( @cmd ) = @_;
435
436     my $real_command = join( ' ', map { quotemeta } @cmd );
437
438     my ( $stdout_fh, $stdout_filename ) = tempfile( 'fast.dump.XXXXXXXX', 'TMPDIR' => 1, );
439     my ( $stderr_fh, $stderr_filename ) = tempfile( 'fast.dump.XXXXXXXX', 'TMPDIR' => 1, );
440
441     $real_command .= sprintf ' 2>%s >%s', quotemeta $stderr_filename, quotemeta $stdout_filename;
442
443     system $real_command;
444     local $/ = undef;
445     my $stdout = <$stdout_fh>;
446     my $stderr = <$stderr_fh>;
447
448     close $stdout_fh;
449     close $stderr_fh;
450
451     unlink( $stdout_filename, $stderr_filename );
452
453     my $error_code;
454     if ( $CHILD_ERROR == -1 ) {
455         $error_code = $OS_ERROR;
456     }
457     elsif ( $CHILD_ERROR & 127 ) {
458         $error_code = sprintf "child died with signal %d, %s coredump\n", ( $CHILD_ERROR & 127 ), ( $CHILD_ERROR & 128 ) ? 'with' : 'without';
459     }
460     else {
461         $error_code = $CHILD_ERROR >> 8;
462     }
463
464     croak( "Couldn't run $real_command : " . $stderr ) if $error_code;
465
466     return $stdout;
467 }
468
469 =head1 NAME
470
471 fast.dump - Program to do very fast dumps of PostgreSQL database
472
473 =head1 SYNOPSIS
474
475 fast.dump [--output=directory/] [--compressor=/usr/bin/gzip] [--jobs=n] [--max-size=n] [--psql=/usr/bin/psql] [--pg_dump=/usr/bin/pg_dump] [--pg_restore=/usr/bin/pg_restore] [--help]
476
477 =head1 OPTIONS
478
479 =over
480
481 =item --output - Directory where to output dump files. Defaults to current directory.
482
483 =item --compressor - path to compressor that should be used to compress
484 data. Default is empty, which doesn't compress, and you'll usually want
485 something like gzip.
486
487 =item --jobs - how many concurrent processes to run when dumping data to
488 tables. Defaults to 1.
489
490 =item --max-size - Minimal size of table (pg_relation_size()) (in megabytes)
491 before fast.dump will try to split it into many separate blocks. Defaults to
492 10.
493
494 =item --psql - path to psql program. Defaults to "psql", which will use
495 $PATH environment variable to find it.
496
497 =item --pg_dump - path to pg_dump program. Defaults to "pg_dump", which will
498 use $PATH environment variable to find it.
499
500 =item --pg_restore - path to pg_restore program. Defaults to "pg_restore",
501 which will use $PATH environment variable to find it.
502
503 =item --help - shows information about usage of the program.
504
505 =back
506
507 All options can be given in abbreviated version, using single dash character
508 and first letter of option, like:
509
510     fast.dump -o /tmp -c bzip2 -j 16
511
512 Database connection details should be given using PG* environment variables.
513
514 =head1 DESCRIPTION
515
516 fast.dump is used to make very fast, although requiring special attention,
517 database dumps.
518
519 It works with PostgreSQL database, and will produce consistent dump only if
520 there are no other connections to datbaase (other than fast.dump itself).
521
522 Generated dumps have form of directory with many files inside, and should be
523 loaded using fast.restore counterpart tool.
524
Note: See TracBrowser for help on using the browser.