root/trunk/tools/fast.dump.and.restore/fast.dump

Revision 251, 19.5 kB (checked in by depesz, 3 years ago)

1. use pg_relation_size(table) + pg_relation_size( toast ) for size estimates
2. use most-common-values if histogram is not present

  • Property svn:executable set to *
Line 
1 #!/usr/bin/env perl
2 package main;
3 use strict;
4 use warnings;
5 my $program = Omni::Program::Pg::FastDump->new();
6 $program->run();
7 exit;
8
9 package Omni::Program::Pg::FastDump;
10 use strict;
11 use warnings;
12 use Carp qw( croak carp );
13 use English qw( -no_match_vars );
14 use Getopt::Long qw( :config no_ignore_case );
15 use Data::Dumper;
16 use Cwd qw( abs_path );
17 use Pod::Usage;
18 use POSIX qw( :sys_wait_h );
19 use File::Spec;
20 use File::Temp qw( tempfile );
21
22 our %killed_pids = ();
23
24 sub REAPER {
25     my $child;
26     while ( ( $child = waitpid( -1, WNOHANG ) ) > 0 ) {
27         $killed_pids{ $child } = time();
28     }
29     $SIG{ 'CHLD' } = \&REAPER;
30     return;
31 }
32
33 sub new {
34     my $class = shift;
35     my $self  = {};
36     bless $self, $class;
37     return $self;
38 }
39
40 sub run {
41     my $self = shift;
42
43     $self->read_options();
44     $self->show_running_details();
45     $self->confirm_work();
46     $self->make_dump();
47     return;
48 }
49
50 sub make_dump {
51     my $self = shift;
52     $self->dump_schema();
53     $self->get_list_of_tables();
54     $self->split_tables_into_blobs();
55     $self->order_blobs();
56     $self->launch_dumpers();
57     return;
58 }
59
60 sub launch_dumpers {
61     my $self = shift;
62     $OUTPUT_AUTOFLUSH = 1;
63     $SIG{ 'CHLD' } = \&REAPER;
64
65     my %c = map { ( $_ => 0 ) } qw( total partial full );
66     for my $t ( @{ $self->{ 'blobs' } } ) {
67         $c{ 'total' }++;
68         $c{ $t->{ 'blob_type' } }++;
69     }
70
71     open my $fh, ">", File::Spec->catfile( $self->{ 'output' }, 'index.lst' ) or croak( "Cannot create index file: $OS_ERROR\n" );
72     printf $fh '%-5s | %-7s | %-32s | %-32s | %-10s | %s%s', '#', qw( type schema table size condition ), "\n";
73     for my $i ( @{ $self->{'blobs'} } ) {
74         printf $fh '%5d | %-7s | %-32s | %-32s | %-10s | %s%s', @{ $i }{ qw( id blob_type schema table size ) }, ( $i->{'condition'} || '' ), "\n";
75     }
76
77     printf "%d blobs to be processed. %d full and %d partial.\n", @c{ qw( total full partial ) };
78     my %running_kids = ();
79     while ( 1 ) {
80         my @pids = keys %killed_pids;
81         for my $killed ( @pids ) {
82             my $blob     = delete $running_kids{ $killed };
83             next unless $blob;
84             my $end_time = delete $killed_pids{ $killed };
85             printf $fh "%s dump (#%d) of %s.%s finished after %d seconds.\n", $blob->{ 'blob_type' }, $blob->{ 'id' }, $blob->{ 'schema' }, $blob->{ 'table' }, $end_time - $blob->{ 'started' };
86         }
87         while ( $self->{ 'jobs' } > scalar keys %running_kids ) {
88             last if 0 == scalar @{ $self->{ 'blobs' } };
89             my $blob = shift @{ $self->{ 'blobs' } };
90             my $pid  = fork();
91             croak "cannot fork" unless defined $pid;
92             if ( $pid == 0 ) {
93
94                 # It's worker process.
95                 delete $SIG{ 'CHLD' };
96                 $self->make_single_blob( $blob );
97                 exit;
98             }
99
100             # It's master.
101             $blob->{ 'started' } = time();
102             $running_kids{ $pid } = $blob;
103         }
104
105         my %what_runs = map { ( $_ => 0 ) } qw( total partial full );
106         for my $t ( values %running_kids ) {
107             $what_runs{ 'total' }++;
108             $what_runs{ $t->{ 'blob_type' } }++;
109         }
110         my %what_waits = map { ( $_ => 0 ) } qw( total partial full );
111         for my $t ( @{ $self->{ 'blobs' } } ) {
112             $what_waits{ 'total' }++;
113             $what_waits{ $t->{ 'blob_type' } }++;
114         }
115         printf 'Running: %3d workers (%3d full, %5d partial). Pending: %3d blobs (%3d full, %5d partial).%s',
116             $what_runs{ 'total' },
117             $what_runs{ 'full' },
118             $what_runs{ 'partial' },
119             $what_waits{ 'total' }, $what_waits{ 'full' }, $what_waits{ 'partial' }, "     \r",    # just some spaces to clean leftover chars, and rewind to beginning of line
120             ;
121         last if ( 0 == $what_runs{ 'total' } ) && ( 0 == $what_waits{ 'total' } );
122         sleep 60;                                                                                  # sleep will get interrupted when child exits, and then the loop will repeat.
123     }
124     printf '%sAll done.%s', "\n", "\n";
125     return;
126 }
127
128 sub make_single_blob {
129     my $self = shift;
130     my $blob = shift;
131
132     my $file_name = $self->get_file_name( $blob );
133     $PROGRAM_NAME .= ' ... ' . $file_name;
134
135     my $output_path = File::Spec->catfile( $self->{ 'output' }, $file_name );
136     my $sql_filename = $output_path . '.sql';
137     open my $sql_fh, '>', $sql_filename or croak( "Cannot write to $sql_filename: $OS_ERROR\n" );
138
139     if ( $blob->{ 'blob_type' } eq 'partial' ) {
140         print $sql_fh "set enable_seqscan = false;\n";
141         printf $sql_fh 'COPY (SELECT * FROM %s WHERE %s) TO stdout;%s', $blob->{ 'full_name' }, $blob->{ 'condition' }, "\n";
142         $PROGRAM_NAME = 'Partial dump of ' . $blob->{ 'full_name' };
143     }
144     else {
145         printf $sql_fh 'COPY %s TO stdout;%s', $blob->{ 'full_name' }, "\n";
146         $PROGRAM_NAME = 'Full dump of ' . $blob->{ 'full_name' };
147     }
148     close $sql_fh;
149
150     my @cmd = ( $self->{ 'psql' }, '-qAtX', '-f', $sql_filename, );
151
152     my $psql_call = join( ' ', map { quotemeta } @cmd );
153     if ( $self->{ 'compressor' } ) {
154         $psql_call .= ' | ' . quotemeta( $self->{ 'compressor' } ) . ' -c -';
155     }
156
157     my ( $stderr_fh, $stderr_filename ) = tempfile( 'fast.dump.XXXXXXXX', 'TMPDIR' => 1, );
158
159     $psql_call .= sprintf ' 2>%s >%s', quotemeta $stderr_filename, quotemeta $output_path;
160
161     system $psql_call;
162     local $/ = undef;
163     my $stderr = <$stderr_fh>;
164
165     close $stderr_fh;
166     unlink( $stderr_filename );
167
168     my $error_code;
169     if ( $CHILD_ERROR == -1 ) {
170         $error_code = $OS_ERROR;
171     }
172     elsif ( $CHILD_ERROR & 127 ) {
173         $error_code = sprintf "child died with signal %d, %s coredump\n", ( $CHILD_ERROR & 127 ), ( $CHILD_ERROR & 128 ) ? 'with' : 'without';
174     }
175     else {
176         $error_code = $CHILD_ERROR >> 8;
177     }
178
179     croak( "\nCouldn't run $psql_call : " . $stderr ) if $error_code;
180     return;
181 }
182
183 sub get_file_name {
184     my $self = shift;
185     my $blob = shift;
186
187     my @output_parts = ( 'data', $blob->{ 'schema' }, $blob->{ 'table' }, $blob->{ 'id' }, 'dump' );
188
189     for my $part ( @output_parts ) {
190         $part =~ s/([^a-zA-Z0-9])/sprintf "_%02x", ord( $1 )/ges;
191     }
192
193     my $output = join '.', @output_parts;
194
195     return $output;
196 }
197
198 sub order_blobs {
199     my $self  = shift;
200     my $i     = 0;
201     my @blobs = map { $_->{ 'id' } = $i++; $_ } sort { $b->{ 'size' } <=> $a->{ 'size' } || $a->{ 'table' } cmp $b->{ 'table' } } @{ $self->{ 'blobs' } };
202     $self->{ 'blobs' } = \@blobs;
203     return;
204 }
205
206 sub split_tables_into_blobs {
207     my $self = shift;
208
209     my @to_split = ();
210     my @blobs    = ();
211     my %oids     = ();
212
213     while ( my ( $schema_name, $tables_hash ) = each %{ $self->{ 'tables' } } ) {
214         while ( my ( $table_name, $table_data ) = each %{ $tables_hash } ) {
215             if ( $table_data->{ 'size' } <= $self->{ 'max-size' } ) {
216                 $table_data->{ 'blob_type' } = 'full';
217                 push @blobs, $table_data;
218                 next;
219             }
220             push @to_split, $table_data;
221             $oids{ $table_data->{ 'oid' } } = $table_data;
222         }
223     }
224     if ( 0 == scalar @to_split ) {
225         $self->{ 'blobs' } = \@blobs;
226         return;
227     }
228     my $oids = join( ',', map { $_->{ 'oid' } } @to_split );
229
230     my $pkey_columns = $self->psql(
231 "select distinct on (i.indrelid) i.indrelid, a.attnum, a.attname, t.typname from pg_index i join pg_attribute a on i.indexrelid = a.attrelid join pg_type t on a.atttypid = t.oid where indrelid = ANY('{$oids}'::oid[]) and indisprimary order by i.indrelid, a.attnum"
232     );
233     croak( "pkey_columns is not arrayref? Something is wrong!" ) unless 'ARRAY' eq ref $pkey_columns;
234     croak( "pkey_columns is not arrayref of arrayrefs? Something is wrong!" ) unless 'ARRAY' eq ref $pkey_columns->[ 0 ];
235
236     my ( $sql_fh, $sql_filename ) = tempfile( 'fast.dump.XXXXXXXX', 'TMPDIR' => 1, );
237
238     for my $row ( @{ $pkey_columns } ) {
239         $oids{ $row->[ 0 ] }->{ 'partition_key' }    = $row->[ 2 ];
240         $oids{ $row->[ 0 ] }->{ 'partition_values' } = [];
241         my $sql_query = sprintf q{
242             SELECT
243                 s2.starelid,
244                 quote_literal(s2.vals[i])
245             FROM
246                 (
247                     SELECT
248                         s1.*,
249                         generate_series( array_lower( s1.vals, 1 ), array_upper( s1.vals, 1 ) ) as i
250                     FROM
251                         (
252                             SELECT
253                                 s.starelid,
254                                 case
255                                 when s.stakind1 = 2 THEN stavalues1
256                                 when s.stakind2 = 2 THEN stavalues2
257                                 when s.stakind3 = 2 THEN stavalues3
258                                 when s.stakind4 = 2 THEN stavalues4
259                                 when s.stakind1 = 1 THEN stavalues1
260                                 when s.stakind2 = 1 THEN stavalues2
261                                 when s.stakind3 = 1 THEN stavalues3
262                                 when s.stakind4 = 1 THEN stavalues4
263                                 ELSE NULL
264                                 END::TEXT::%s[] as vals
265                             FROM
266                                 pg_statistic s
267                             WHERE
268                                 s.starelid = %d
269                                 AND staattnum = %d
270                                 AND ( 2 in (s.stakind1, s.stakind2, s.stakind3, s.stakind4) OR 1 in (s.stakind1, s.stakind2, s.stakind3, s.stakind4) )
271                         ) as s1
272                 ) as s2
273             ORDER BY s2.starelid, s2.vals[i];%s}, @{ $row }[ 3, 0, 1 ], "\n";
274         print $sql_fh $sql_query;
275     }
276     close $sql_fh;
277
278     my $partitions = $self->psql( '\i ' . $sql_filename );
279     unlink $sql_filename;
280
281     for my $row ( @{ $partitions } ) {
282         push @{ $oids{ $row->[ 0 ] }->{ 'partition_values' } }, $row->[ 1 ];
283     }
284
285     for my $table ( @to_split ) {
286         if (
287             ( !defined $table->{ 'partition_values' } ) ||
288             ( 0 == scalar @{ $table->{ 'partition_values' } } )
289         ) {
290             $table->{ 'blob_type' } = 'full';
291             push @blobs, $table;
292             next;
293         }
294         for my $i ( 0 .. $#{ $table->{ 'partition_values' } } ) {
295             my $blob = {};
296             @{ $blob }{ keys %{ $table } } = ( values %{ $table } );
297             delete $blob->{ 'partition_key' };
298             delete $blob->{ 'partition_values' };
299             $blob->{ 'blob_type' } = 'partial';
300             if ( $i == 0 ) {
301                 $blob->{ 'condition' } = sprintf "%s < %s", $table->{ 'partition_key' }, $table->{ 'partition_values' }->[ $i ];
302                 $blob->{ 'size' } = 0;
303             }
304             else {
305                 $blob->{ 'condition' } = sprintf "%s >= %s and %s < %s", $table->{ 'partition_key' }, $table->{ 'partition_values' }->[ $i - 1 ], $table->{ 'partition_key' },
306                     $table->{ 'partition_values' }->[ $i ];
307                 $blob->{ 'size' } /= ( scalar( @{ $table->{ 'partition_values' } } ) - 1 );
308             }
309             push @blobs, $blob;
310         }
311         $table->{ 'blob_type' } = 'partial';
312         $table->{ 'size' } /= ( scalar( @{ $table->{ 'partition_values' } } ) - 1 );
313         if ( !defined $table->{'partition_values'}->[-1] ) {
314             print Dumper( $table ) . "PV\n";
315         }
316         if ( !defined $table->{'partition_key'} ) {
317             print Dumper( $table ) . "PK\n";
318         }
319         $table->{ 'condition' } = sprintf "%s >= %s", $table->{ 'partition_key' }, $table->{ 'partition_values' }->[ -1 ];
320         delete $table->{ 'partition_key' };
321         delete $table->{ 'partition_values' };
322         push @blobs, $table;
323     }
324     $self->{ 'blobs' } = \@blobs;
325     delete $self->{ 'tables' };
326     return;
327 }
328
329 sub get_list_of_tables {
330     my $self = shift;
331
332     my $restored = $self->run_command( $self->{ 'pg_restore' }, '-l', File::Spec->catfile( $self->{ 'output' }, 'schema.dump' ) );
333
334     my @lines = split /\r?\n/, $restored;
335     my %tables = ();
336     for my $line ( @lines ) {
337         next unless $line =~ m{\A\d+;\s+\d+\s+\d+\s+TABLE\s+(\S+)\s+(\S+)\s+};
338         $tables{ $1 }->{ $2 } = { 'schema' => $1, 'table' => $2, };
339     }
340     if ( 0 == scalar keys %tables ) {
341         print "This dump doesn't contain any tables.\n";
342         exit 0;
343     }
344
345     my $db_sizes = $self->psql(
346 "
347     SELECT
348         n.nspname,
349         c.relname,
350         c.oid::regclass,
351         c.oid,
352         cast(
353             (
354                 pg_relation_size(c.oid)
355                 +
356                 (
357                     CASE
358                         WHEN tn.oid IS NOT NULL THEN pg_relation_size( tc.oid )
359                         ELSE 0
360                     END
361                 )
362             ) / 1024 as int8
363         )
364     FROM
365         pg_class c
366         join pg_namespace n on c.relnamespace = n.oid
367         left outer join pg_class tc on tc.relname = 'pg_toast_' || c.oid
368         left outer join pg_namespace tn on tc.relnamespace = tn.oid and tn.nspname = 'pg_toast'
369     WHERE
370         c.relkind = 'r'
371 "
372     );
373
374     @lines = split /\r?\n/, $db_sizes;
375     for my $row ( @{ $db_sizes } ) {
376         my ( $schema, $table, $full_name, $oid, $size ) = @{ $row };
377         next unless exists $tables{ $schema };
378         next unless exists $tables{ $schema }->{ $table };
379         $tables{ $schema }->{ $table }->{ 'full_name' } = $full_name;
380         $tables{ $schema }->{ $table }->{ 'size' }      = $size;
381         $tables{ $schema }->{ $table }->{ 'oid' }       = $oid;
382     }
383
384     $self->{ 'tables' } = \%tables;
385     return;
386 }
387
388 sub dump_schema {
389     my $self = shift;
390     $self->run_command( $self->{ 'pg_dump' }, '-Fc', '-f', File::Spec->catfile( $self->{ 'output' }, 'schema.dump' ), '-s', '-v' );
391     return;
392 }
393
394 sub confirm_work {
395     my $self = shift;
396     printf "\n\nAre you sure you want to continue?\n";
397     printf "Please remember that any other ( aside from $PROGRAM_NAME ) connections to database can cause dump corruption!\n";
398     printf "Enter YES to continue: ";
399     my $input = <STDIN>;
400     exit unless $input =~ m{\AYES\r?\n?\z};
401     return;
402 }
403
404 sub show_running_details {
405     my $self = shift;
406
407     my $db = $self->psql( 'SELECT current_user, current_database()' );
408
409     my $largest_tables = $self->psql(
410 "SELECT * FROM ( SELECT rpad(oid::regclass::text, 32) || ' (' || pg_size_pretty(pg_relation_size(oid)) || ')' from pg_class where relkind = 'r' and relname !~ '^pg_' order by pg_relation_size(oid) desc limit 5) x order by 1"
411     );
412
413     my @tables = map { $_->[ 0 ] } @{ $largest_tables };
414
415     printf "Config:\n";
416     for my $key ( sort keys %{ $self } ) {
417         printf "%-10s : %s\n", $key, $self->{ $key };
418     }
419
420     printf "\nDatabase details:\n";
421     printf "User          : %s\n", $db->[ 0 ]->[ 0 ];
422     printf "Database      : %s\n", $db->[ 0 ]->[ 1 ];
423     printf "Sample tables : %s\n", shift @tables;
424     printf "              - %s\n", $_ for @tables;
425     return;
426 }
427
428 sub read_options {
429     my $self = shift;
430
431     my $opts = {
432         'psql'       => 'psql',
433         'pg_dump'    => 'pg_dump',
434         'pg_restore' => 'pg_restore',
435         'output'     => '.',
436         'jobs'       => 1,
437         'max-size'   => 10240,
438     };
439     my $is_ok = GetOptions( $opts, qw( help|? output|o=s compressor|c=s jobs|j=i max-size|m=i psql|p=s pg_dump|d=s pg_restore|r=s ) );
440     pod2usage( '-verbose' => 1, ) unless $is_ok;
441     pod2usage( '-verbose' => 99, '-sections' => [ qw( DESCRIPTION SYNOPSIS OPTIONS ) ] ) if $opts->{ 'help' };
442
443     pod2usage( '-message' => 'Output directory has to be given.' ) if !$opts->{ 'output' };
444     pod2usage( '-message' => 'Output directory does not exist.' )  if !-e $opts->{ 'output' };
445     pod2usage( '-message' => 'Output is not directory.' )          if !-d $opts->{ 'output' };
446     pod2usage( '-message' => 'Output directory is not writable.' ) if !-w $opts->{ 'output' };
447
448     pod2usage( '-message' => 'Number of jobs has to be not-empty.' ) if '' eq $opts->{ 'jobs' };
449     $opts->{ 'jobs' } = int( $opts->{ 'jobs' } );
450     pod2usage( '-message' => 'Number of jobs cannot be less than 1.' )   if 1 > $opts->{ 'jobs' };
451     pod2usage( '-message' => 'Number of jobs cannot be more than 100.' ) if 100 < $opts->{ 'jobs' };
452
453     pod2usage( '-message' => 'Max-size has to be not-empty.' ) if '' eq $opts->{ 'max-size' };
454     $opts->{ 'max-size' } = int( $opts->{ 'max-size' } );
455     pod2usage( '-message' => 'Max-size cannot be less than 1.' ) if 1 > $opts->{ 'max-size' };
456
457     $opts->{ 'output' } = abs_path( $opts->{ 'output' } );
458     @{ $self }{ keys %{ $opts } } = values %{ $opts };
459     return;
460 }
461
462 sub psql {
463     my $self   = shift;
464     my $query  = shift;
465     my $output = $self->run_command( $self->{ 'psql' }, '-qAtX', '-F', "\t", '-c', $query, );
466     my @rows   = grep { '' ne $_ } split /\r?\n/, $output;
467     my @data   = map { [ split /\t/, $_ ] } @rows;
468     return \@data;
469 }
470
471 sub run_command {
472     my $self = shift;
473     my ( @cmd ) = @_;
474
475     my $real_command = join( ' ', map { quotemeta } @cmd );
476
477     my ( $stdout_fh, $stdout_filename ) = tempfile( 'fast.dump.XXXXXXXX', 'TMPDIR' => 1, );
478     my ( $stderr_fh, $stderr_filename ) = tempfile( 'fast.dump.XXXXXXXX', 'TMPDIR' => 1, );
479
480     $real_command .= sprintf ' 2>%s >%s', quotemeta $stderr_filename, quotemeta $stdout_filename;
481
482     system $real_command;
483     local $/ = undef;
484     my $stdout = <$stdout_fh>;
485     my $stderr = <$stderr_fh>;
486
487     close $stdout_fh;
488     close $stderr_fh;
489
490     unlink( $stdout_filename, $stderr_filename );
491
492     my $error_code;
493     if ( $CHILD_ERROR == -1 ) {
494         $error_code = $OS_ERROR;
495     }
496     elsif ( $CHILD_ERROR & 127 ) {
497         $error_code = sprintf "child died with signal %d, %s coredump\n", ( $CHILD_ERROR & 127 ), ( $CHILD_ERROR & 128 ) ? 'with' : 'without';
498     }
499     else {
500         $error_code = $CHILD_ERROR >> 8;
501     }
502
503     croak( "Couldn't run $real_command : " . $stderr ) if $error_code;
504
505     return $stdout;
506 }
507
508 =head1 NAME
509
510 fast.dump - Program to do very fast dumps of PostgreSQL database
511
512 =head1 SYNOPSIS
513
514 fast.dump [--output=directory/] [--compressor=/usr/bin/gzip] [--jobs=n] [--max-size=n] [--psql=/usr/bin/psql] [--pg_dump=/usr/bin/pg_dump] [--pg_restore=/usr/bin/pg_restore] [--help]
515
516 =head1 OPTIONS
517
518 =over
519
520 =item --output - Directory where to output dump files. Defaults to current directory.
521
522 =item --compressor - path to compressor that should be used to compress
523 data. Default is empty, which doesn't compress, and you'll usually want
524 something like gzip.
525
526 =item --jobs - how many concurrent processes to run when dumping data to
527 tables. Defaults to 1.
528
529 =item --max-size - Minimal size of table (pg_relation_size()) (in kilobytes)
530 before fast.dump will try to split it into many separate blocks. Defaults to
531 10240 (10MB).
532
533 =item --psql - path to psql program. Defaults to "psql", which will use
534 $PATH environment variable to find it.
535
536 =item --pg_dump - path to pg_dump program. Defaults to "pg_dump", which will
537 use $PATH environment variable to find it.
538
539 =item --pg_restore - path to pg_restore program. Defaults to "pg_restore",
540 which will use $PATH environment variable to find it.
541
542 =item --help - shows information about usage of the program.
543
544 =back
545
546 All options can be given in abbreviated version, using single dash character
547 and first letter of option, like:
548
549     fast.dump -o /tmp -c bzip2 -j 16
550
551 Database connection details should be given using PG* environment variables.
552
553 =head1 DESCRIPTION
554
555 fast.dump is used to make very fast, although requiring special attention,
556 database dumps.
557
558 It works with PostgreSQL database, and will produce consistent dump only if
559 there are no other connections to datbaase (other than fast.dump itself).
560
561 Generated dumps have form of directory with many files inside, and should be
562 loaded using fast.restore counterpart tool.
563
Note: See TracBrowser for help on using the browser.