Changeset 19

Show
Ignore:
Timestamp:
08/26/09 15:23:09 (5 years ago)
Author:
jesus
Message:

some more replication work -- continuous pull is the only TODO (and testing all of it).

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/bin/corneactl

    r18 r19  
    1414use Switch; 
    1515 
     16sub exit_msg { 
     17  my $exit_code = shift; 
     18  my $message = shift || ($exit_code ? 'failure' : 'success'); 
     19  print "$message\n"; 
     20  exit($exit_code); 
     21} 
     22 
    1623my $conf_file; 
    1724GetOptions("c=s" => \$conf_file); 
     
    1926 
    2027my $cmd = shift; 
     28my $rt = Cornea::RecallTable->new(); 
     29 
     30sub usage { 
     31  print <<EOF; 
     32$0 [-c conf_file] <command> [args] 
     33 
     34Commands: 
     35 
     36        list-metanodes 
     37 
     38        init-metanode 
     39 
     40        init-peer-metanode <nodename> 
     41 
     42        first-sync-peer-metanode <nodename> 
     43 
     44        drop-peer-metanode <nodename> 
     45 
     46        mirror-metanode <nodename> 
     47 
     48        list-nodes 
     49 
     50        update-node [fqdn location] 
     51EOF 
     52} 
     53 
    2154switch($cmd) { 
     55  case 'list-metanodes' { 
     56    $rt->listAssetTables(); 
     57  } 
     58  case 'init-metanode' { 
     59    exit_msg($rt->initAssetTable()); 
     60  } 
     61  case 'init-peer-metanode' { 
     62    my $remote_node = shift or (usage() && exit -1); 
     63    exit_msg($rt->setupAssetQueue($remote_node)); 
     64  } 
     65  case 'first-sync-peer-metanode' { 
     66    my $remote_node = shift or (usage() && exit -1); 
     67    exit_msg($rt->initialAssetSynch($remote_node)); 
     68  } 
     69  case 'drop-peer-metanode' { 
     70    my $remote_node = shift; 
     71    exit_msg($rt->destroyAssetQueue($remote_node)); 
     72  } 
     73  case 'mirror-metanode' { 
     74    my $remote_node = shift; 
     75    $rt->pullAssetTable($remote_node); 
     76  } 
    2277  case 'list-nodes' { 
    23     my $rt = Cornea::RecallTable->new(); 
    2478    print Dump($rt->getNodes()); 
    2579  } 
     
    3488        unless eval { $ip eq inet_ntoa($addr[4]); }; 
    3589    } 
    36     my $rt = Cornea::RecallTable->new(); 
    3790    my ($total, $used) = Cornea::Utils::fsinfo($conf->get('Storage::base')."/."); 
    3891    my $min = $conf->get('Storage::minimum'); 
  • trunk/perl/lib/Cornea/RecallTable.pm

    r18 r19  
    3434  my $class = shift; 
    3535  my $self = bless { }, $class; 
    36   $self->__connect; 
    3736  $self; 
    3837} 
     
    233232} 
    234233 
     234sub initAssetTable { 
     235  my $self = shift; 
     236  my $config = Cornea::Config->new(); 
     237  my $host = $config->get('sysinfo::nodename'); 
     238  (my $tbl = $host) =~ s/\-/_/g; 
     239  $tbl =~ s/\..*//; 
     240  my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", 
     241                         $config->get("DB::user"), 
     242                         $config->get("DB::pass"), 
     243                         { PrintError => 0, RaiseError => 1, AutoCommit => 1 }, 
     244                        ); 
     245  $dbh->begin_work(); 
     246  eval { 
     247    $dbh->do("CREATE TABLE cornea.asset_$tbl 
     248                    (CONSTRAINT asset_${tbl}_pkey 
     249                        PRIMARY KEY (service_id, asset_id, representation_id)) 
     250                     INHERITS (cornea.asset)"); 
     251    $dbh->do("CREATE OR REPLACE FUNCTION cornea.make_asset(in_service_id integer, in_asset_id bigint, in_repid integer, in_storage_location integer[]) RETURNS void AS 'insert into asset_${tbl} (service_id, asset_id, representation_id, storage_location) VALUES (\$1, \$2, \$3, \$4);' LANGUAGE sql"); 
     252    $dbh->commit(); 
     253  }; 
     254  if($@) { 
     255    my $err = $@; 
     256    eval { $dbh->rollback; }; 
     257    return (-1, "already initialized") if $err =~ /already exists/; 
     258    return (-1, $err); 
     259  } 
     260  return 0; 
     261} 
     262 
     263sub setupAssetQueue { 
     264  my $self = shift; 
     265  my $config = Cornea::Config->new(); 
     266  my $host = shift; 
     267  gethostbyname($host) || die "could not resolve $host\n";; 
     268  (my $tbl = $host) =~ s/\-/_/g; 
     269  $tbl =~ s/\..*//; 
     270  my $phost = $config->get('sysinfo::nodename'); 
     271  (my $ptbl = $phost) =~ s/\-/_/g; 
     272  $ptbl =~ s/\..*//; 
     273  my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", 
     274                         $config->get("DB::user"), 
     275                         $config->get("DB::pass"), 
     276                         { PrintError => 0, RaiseError => 1, AutoCommit => 1 }, 
     277                        ); 
     278  $dbh->begin_work(); 
     279  eval { 
     280    $dbh->do("CREATE TABLE cornea.asset_$tbl 
     281                    (CONSTRAINT asset_${tbl}_uc 
     282                        PRIMARY KEY (service_id, asset_id, representation_id)) 
     283                     INHERITS (cornea.asset)"); 
     284    $dbh->do("CREATE TABLE cornea.asset_${tbl}_queue 
     285                     (LIKE cornea.asset 
     286                      EXCLUDING CONSTRAINTS 
     287                      EXCLUDING INDEXES)"); 
     288    $dbh->do(<<SQL); 
     289CREATE FUNCTION cornea.populate_asset_${tbl}_queue() RETURNS TRIGGER 
     290  AS ' 
     291DECLARE 
     292BEGIN 
     293  INSERT INTO cornea.asset_${tbl}_queue 
     294              (asset_id, service_id, representation_id, storage_location) 
     295       VALUES (NEW.asset_id, NEW.service_id, NEW.representation_id, 
     296               NEW.storage_location); 
     297  RETURN NEW; 
     298END 
     299' LANGUAGE plpgsql 
     300SQL 
     301    $dbh->do("CREATE TRIGGER asset_${tbl}_queue_trigger 
     302                AFTER INSERT OR UPDATE ON cornea.asset_${ptbl} 
     303                FOR EACH ROW 
     304                EXECUTE PROCEDURE cornea.populate_asset_${tbl}_queue()"); 
     305    $dbh->commit(); 
     306  }; 
     307  if($@) { 
     308    my $err = $@; 
     309    eval { $dbh->rollback; }; 
     310    return (-1, "init-metanode first") 
     311      if $err =~ /"cornea.asset_$ptbl" does not exist/; 
     312    return (-1, "already initialized") if $err =~ /already exists/; 
     313    die $err; 
     314  } 
     315  return 0; 
     316} 
     317 
     318sub destroyAssetQueue { 
     319  my $self = shift; 
     320  my $config = Cornea::Config->new(); 
     321  my $host = shift; 
     322  gethostbyname($host) || die "could not resolve $host\n";; 
     323  (my $tbl = $host) =~ s/\-/_/g; 
     324  $tbl =~ s/\..*//; 
     325  my $phost = $config->get('sysinfo::nodename'); 
     326  (my $ptbl = $phost) =~ s/\-/_/g; 
     327  $ptbl =~ s/\..*//; 
     328  my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", 
     329                         $config->get("DB::user"), 
     330                         $config->get("DB::pass"), 
     331                         { PrintError => 0, RaiseError => 1, AutoCommit => 1 }, 
     332                        ); 
     333  $dbh->begin_work(); 
     334  eval { 
     335    $dbh->do("DROP TRIGGER asset_${tbl}_queue_trigger ON cornea.asset_${ptbl}"); 
     336    $dbh->do("DROP FUNCTION cornea.populate_asset_${tbl}_queue()"); 
     337    $dbh->do("DROP TABLE cornea.asset_${tbl}_queue"); 
     338    $dbh->do("DROP TABLE cornea.asset_$tbl"); 
     339    $dbh->commit(); 
     340  }; 
     341  if($@) { 
     342    my $err = $@; 
     343    eval { $dbh->rollback; }; 
     344    return (-1, "already perfomed") if $err =~ /does not exist/; 
     345    return (-1, $err); 
     346  } 
     347  return 0; 
     348} 
     349 
     350sub initialAssetSynch { 
     351  my $self = shift; 
     352  my $config = Cornea::Config->new(); 
     353  my $host = shift; 
     354  gethostbyname($host) || die "could not resolve $host\n";; 
     355  (my $tbl = $host) =~ s/\-/_/g; 
     356  $tbl =~ s/\..*//; 
     357  my $phost = $config->get('sysinfo::nodename'); 
     358  (my $ptbl = $phost) =~ s/\-/_/g; 
     359  $ptbl =~ s/\..*//; 
     360  my $total_rows = 0; 
     361  my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea", 
     362                         $config->get("DB::user"), 
     363                         $config->get("DB::pass"), 
     364                         { PrintError => 0, RaiseError => 1, AutoCommit => 1 }, 
     365                        ); 
     366  my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", 
     367                         $config->get("DB::user"), 
     368                         $config->get("DB::pass"), 
     369                         { PrintError => 0, RaiseError => 1, AutoCommit => 1 }, 
     370                        ); 
     371  $dbh->begin_work(); 
     372  $ldbh->begin_work(); 
     373  my $problem; 
     374  eval { 
     375    $problem = "peer not locally initialized"; 
     376    my $isth = $ldbh->prepare( 
     377        "INSERT INTO cornea.asset_${tbl} 
     378                    (asset_id, service_id, 
     379                     representation_id, storage_location) 
     380              VALUES (?,?,?,?::smallint[])"); 
     381    $problem = "remote not initialized"; 
     382    $dbh->do("DECLARE initpull CURSOR FOR 
     383               SELECT asset_id, service_id, 
     384                      representation_id, storage_location 
     385                 FROM cornea.asset_${tbl}"); 
     386    $problem = "remote peer not initialized"; 
     387    $dbh->do("TRUNCATE cornea.asset_${ptbl}_queue"); 
     388    $problem = "error pulling remote data"; 
     389    my $sth = $dbh->prepare("FETCH FORWARD 10000 FROM initipull"); 
     390    my $internal_rows_moved; 
     391    do { 
     392      $internal_rows_moved = 0; 
     393      $sth->execute(); 
     394      while(my @row = $sth->fetchrow()) { 
     395        $problem = "error insert local data"; 
     396        $isth->execute(@row); 
     397        $internal_rows_moved++; 
     398        $problem = "error pulling remote data"; 
     399      } 
     400      $total_rows += $internal_rows_moved; 
     401    } while($internal_rows_moved); 
     402    $dbh->do("CLOSE initpull"); 
     403    $dbh->commit(); 
     404    $ldbh->commit(); 
     405  }; 
     406  if($@) { 
     407    my $err = $@; 
     408    eval { $ldbh->rollback; }; 
     409    eval { $dbh->rollback; }; 
     410    return (-1, "$problem:\n$err"); 
     411  } 
     412  return (0, "$total_rows copied"); 
     413} 
    2354141;