| 1 |
package Cornea::RecallTable; |
|---|
| 2 |
use strict; |
|---|
| 3 |
use Cornea::Config; |
|---|
| 4 |
use Cornea::Utils; |
|---|
| 5 |
use DBI; |
|---|
| 6 |
|
|---|
| 7 |
sub __connect { |
|---|
| 8 |
my $self = shift; |
|---|
| 9 |
my $config = Cornea::Config->new(); |
|---|
| 10 |
my $dbh; |
|---|
| 11 |
my $failed_err = undef; |
|---|
| 12 |
my $dsns = $config->get_list("DB::dsn"); |
|---|
| 13 |
Cornea::Utils::shuffle($dsns); |
|---|
| 14 |
foreach my $dsn (@$dsns) { |
|---|
| 15 |
eval { |
|---|
| 16 |
$dbh = DBI->connect($dsn, |
|---|
| 17 |
$config->get("DB::user"), |
|---|
| 18 |
$config->get("DB::pass"), |
|---|
| 19 |
{ PrintError => 0, RaiseError => 1 }, |
|---|
| 20 |
); |
|---|
| 21 |
}; |
|---|
| 22 |
die "$dsn: $@\n" if $@; |
|---|
| 23 |
last unless $@; |
|---|
| 24 |
} |
|---|
| 25 |
print STDERR "$failed_err\n" if $failed_err; |
|---|
| 26 |
$self->{dbh} = $dbh; |
|---|
| 27 |
} |
|---|
| 28 |
sub __reconnect { |
|---|
| 29 |
my $self = shift; |
|---|
| 30 |
$self->{dbh} = undef; |
|---|
| 31 |
$self->__connect(); |
|---|
| 32 |
} |
|---|
| 33 |
sub new { |
|---|
| 34 |
my $class = shift; |
|---|
| 35 |
my $self = bless { }, $class; |
|---|
| 36 |
$self; |
|---|
| 37 |
} |
|---|
| 38 |
|
|---|
| 39 |
sub insert { |
|---|
| 40 |
my $self = shift; |
|---|
| 41 |
my ($serviceId, $assetId, $repId, $snl) = @_; |
|---|
| 42 |
my $tried = 0; |
|---|
| 43 |
die 'bad parameters' unless UNIVERSAL::ISA($snl, 'Cornea::StorageNodeList'); |
|---|
| 44 |
my $snl_arr = '{' . join(',', map { $_->storeagenodeid() } |
|---|
| 45 |
($snl->items())) . '}'; |
|---|
| 46 |
again: |
|---|
| 47 |
eval { |
|---|
| 48 |
my $sth = $self->{dbh}->prepare("select make_asset(?,?,?,?::int[])"); |
|---|
| 49 |
$sth->execute($serviceId, $assetId, $repId, $snl_arr); |
|---|
| 50 |
$sth->finish(); |
|---|
| 51 |
}; |
|---|
| 52 |
if ($@) { |
|---|
| 53 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 54 |
die $@ if $@; |
|---|
| 55 |
} |
|---|
| 56 |
return 1; |
|---|
| 57 |
} |
|---|
| 58 |
|
|---|
| 59 |
sub find { |
|---|
| 60 |
my $self = shift; |
|---|
| 61 |
my ($serviceId, $assetId, $repId) = @_; |
|---|
| 62 |
my $sth = $self->{dbh}->prepare("select get_asset_location(?,?,?)"); |
|---|
| 63 |
my $tried = 0; |
|---|
| 64 |
my $C; |
|---|
| 65 |
again: |
|---|
| 66 |
eval { |
|---|
| 67 |
$C = Cornea::StorageNodeList->new(); |
|---|
| 68 |
$sth->execute($serviceId, $assetId, $repId); |
|---|
| 69 |
while(my $node = $sth->fetchrow_hashref()) { |
|---|
| 70 |
$C->add(Cornea::StorageNode->new_from_row($node)); |
|---|
| 71 |
} |
|---|
| 72 |
$sth->finish(); |
|---|
| 73 |
}; |
|---|
| 74 |
if ($@) { |
|---|
| 75 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 76 |
die $@ if $@; |
|---|
| 77 |
} |
|---|
| 78 |
return $C; |
|---|
| 79 |
} |
|---|
| 80 |
|
|---|
| 81 |
sub getNodes { |
|---|
| 82 |
my $self = shift; |
|---|
| 83 |
my $type = shift; |
|---|
| 84 |
my $tried = 0; |
|---|
| 85 |
my $snl; |
|---|
| 86 |
again: |
|---|
| 87 |
eval { |
|---|
| 88 |
$snl = Cornea::StorageNodeList->new(); |
|---|
| 89 |
my $sth = $self->{dbh}->prepare("select * from get_storage_nodes_by_state(?)"); |
|---|
| 90 |
$sth->execute($type); |
|---|
| 91 |
while(my $row = $sth->fetchrow_hashref()) { |
|---|
| 92 |
$snl->add(Cornea::StorageNode->new_from_row($row)); |
|---|
| 93 |
} |
|---|
| 94 |
$sth->finish(); |
|---|
| 95 |
}; |
|---|
| 96 |
if ($@) { |
|---|
| 97 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 98 |
die $@ if $@; |
|---|
| 99 |
} |
|---|
| 100 |
return $snl; |
|---|
| 101 |
} |
|---|
| 102 |
|
|---|
| 103 |
sub _2pc_add_storage { |
|---|
| 104 |
my $ip = shift; |
|---|
| 105 |
my $attr = shift; |
|---|
| 106 |
my $storage_node_id; |
|---|
| 107 |
my $config = Cornea::Config->new(); |
|---|
| 108 |
my $dsns = $config->get_list("DB::dsn"); |
|---|
| 109 |
my @dbh = map { |
|---|
| 110 |
my $dbh = DBI->connect($_, |
|---|
| 111 |
$config->get("DB::user"), |
|---|
| 112 |
$config->get("DB::pass"), |
|---|
| 113 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 } |
|---|
| 114 |
); |
|---|
| 115 |
$dbh->begin_work(); |
|---|
| 116 |
$dbh; |
|---|
| 117 |
} @$dsns; |
|---|
| 118 |
eval { |
|---|
| 119 |
foreach (@dbh) { |
|---|
| 120 |
eval { |
|---|
| 121 |
my $sth = $_->prepare("select set_storage_node(?,?,?,?,?,?,?)"); |
|---|
| 122 |
$sth->execute($attr->{state}, |
|---|
| 123 |
$attr->{total_storage}, $attr->{used_storage}, |
|---|
| 124 |
$attr->{location}, $attr->{fqdn}, $ip, $storage_node_id); |
|---|
| 125 |
my ($returned_storade_node_id) = $sth->fetchrow(); |
|---|
| 126 |
$storage_node_id ||= $returned_storade_node_id; |
|---|
| 127 |
$sth->finish(); |
|---|
| 128 |
die "Storage node trickery! (this should never happen).\n" |
|---|
| 129 |
if($storage_node_id != $returned_storade_node_id); |
|---|
| 130 |
}; |
|---|
| 131 |
if ($@) { |
|---|
| 132 |
die "location must be specified for first-time update\n" |
|---|
| 133 |
if $@ =~ /null value in column "location"/; |
|---|
| 134 |
die $@ if $@; |
|---|
| 135 |
} |
|---|
| 136 |
} |
|---|
| 137 |
foreach (@dbh) { $_->do("prepare transaction 'cornea_node'"); } |
|---|
| 138 |
foreach (@dbh) { $_->do("commit prepared 'cornea_node'"); } |
|---|
| 139 |
}; |
|---|
| 140 |
if ($@) { |
|---|
| 141 |
my $real_error = $@; |
|---|
| 142 |
$storage_node_id = undef; |
|---|
| 143 |
eval { foreach (@dbh) { $_->do("rollback prepared 'cornea_node'"); } }; |
|---|
| 144 |
die $real_error; |
|---|
| 145 |
} |
|---|
| 146 |
foreach (@dbh) { $_->disconnect; } |
|---|
| 147 |
die $@ unless($storage_node_id); |
|---|
| 148 |
return $storage_node_id; |
|---|
| 149 |
} |
|---|
| 150 |
sub updateNode { |
|---|
| 151 |
my $self = shift; |
|---|
| 152 |
my $ip = shift; |
|---|
| 153 |
my $attr = shift; |
|---|
| 154 |
my $config = Cornea::Config->new(); |
|---|
| 155 |
die "bad state" |
|---|
| 156 |
unless $attr->{state} =~ /^(?:open|closed|offline|decommissioned)$/; |
|---|
| 157 |
die "storage must be a number" |
|---|
| 158 |
unless $attr->{total_storage} =~ /^[1-9]\d*$/ and |
|---|
| 159 |
$attr->{used_storage} =~ /^[1-9]\d*$/; |
|---|
| 160 |
die "locaion must be dc/cage/row/rack/pdu" |
|---|
| 161 |
unless !defined($attr->{location}) or |
|---|
| 162 |
$attr->{location} =~ /^[^\/]+(?:\/[^\/]+){4}$/; |
|---|
| 163 |
die "fqdn must not be blank" |
|---|
| 164 |
unless !defined($attr->{fqdn}) or length($attr->{fqdn}); |
|---|
| 165 |
|
|---|
| 166 |
if(defined($attr->{location}) || defined($attr->{fqdn})) { |
|---|
| 167 |
return return _2pc_add_storage($ip, $attr); |
|---|
| 168 |
} |
|---|
| 169 |
my $dsns = $config->get_list("DB::dsn"); |
|---|
| 170 |
foreach (@$dsns) { |
|---|
| 171 |
my $dbh = DBI->connect($_, |
|---|
| 172 |
$config->get("DB::user"), |
|---|
| 173 |
$config->get("DB::pass"), |
|---|
| 174 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 } |
|---|
| 175 |
); |
|---|
| 176 |
my $tried = 0; |
|---|
| 177 |
again: |
|---|
| 178 |
eval { |
|---|
| 179 |
my $sth = $self->{dbh}->prepare("select set_storage_node(?,?,?,?,?,?,?)"); |
|---|
| 180 |
$sth->execute($attr->{state}, |
|---|
| 181 |
$attr->{total_storage}, $attr->{used_storage}, |
|---|
| 182 |
$attr->{location}, $attr->{fqdn}, $ip, undef); |
|---|
| 183 |
$sth->finish(); |
|---|
| 184 |
}; |
|---|
| 185 |
if ($@) { |
|---|
| 186 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 187 |
print STDERR $@; |
|---|
| 188 |
} |
|---|
| 189 |
} |
|---|
| 190 |
return 0; |
|---|
| 191 |
} |
|---|
| 192 |
|
|---|
| 193 |
sub repInfo { |
|---|
| 194 |
my $self = shift; |
|---|
| 195 |
my ($serviceId, $repId) = @_; |
|---|
| 196 |
my $tried = 0; |
|---|
| 197 |
my $row; |
|---|
| 198 |
again: |
|---|
| 199 |
eval { |
|---|
| 200 |
my $sth = $self->{dbh}->prepare("select * from get_representation(?,?)"); |
|---|
| 201 |
$sth->execute($serviceId, $repId); |
|---|
| 202 |
$row = $sth->fetchrow_hashref(); |
|---|
| 203 |
$sth->finish(); |
|---|
| 204 |
}; |
|---|
| 205 |
if ($@) { |
|---|
| 206 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 207 |
die $@ if $@; |
|---|
| 208 |
} |
|---|
| 209 |
return Cornea::RepresentationInfo->new_from_row($row); |
|---|
| 210 |
} |
|---|
| 211 |
|
|---|
| 212 |
sub repInfoDependents { |
|---|
| 213 |
my $self = shift; |
|---|
| 214 |
my ($serviceId, $repId) = @_; |
|---|
| 215 |
my $tried = 0; |
|---|
| 216 |
my @deps; |
|---|
| 217 |
again: |
|---|
| 218 |
eval { |
|---|
| 219 |
@deps = (); |
|---|
| 220 |
my $sth = $self->{dbh}->prepare("select * from get_representation_dependents(?,?)"); |
|---|
| 221 |
$sth->execute($serviceId, $repId); |
|---|
| 222 |
while(my $row = $sth->fetchrow_hashref()) { |
|---|
| 223 |
push @deps, Cornea::RepresentationInfo->new_from_row($row); |
|---|
| 224 |
} |
|---|
| 225 |
$sth->finish(); |
|---|
| 226 |
}; |
|---|
| 227 |
if ($@) { |
|---|
| 228 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 229 |
die $@ if $@; |
|---|
| 230 |
} |
|---|
| 231 |
return @deps; |
|---|
| 232 |
} |
|---|
| 233 |
|
|---|
| 234 |
sub initAssetTable { |
|---|
| 235 |
my $self = shift; |
|---|
| 236 |
my $config = Cornea::Config->new(); |
|---|
| 237 |
my $host = $config->get('sysinfo::nodename'); |
|---|
| 238 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 239 |
$tbl =~ s/\..*//; |
|---|
| 240 |
my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 241 |
$config->get("DB::user"), |
|---|
| 242 |
$config->get("DB::pass"), |
|---|
| 243 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 244 |
); |
|---|
| 245 |
$dbh->begin_work(); |
|---|
| 246 |
eval { |
|---|
| 247 |
$dbh->do("CREATE TABLE cornea.asset_$tbl |
|---|
| 248 |
(CONSTRAINT asset_${tbl}_pkey |
|---|
| 249 |
PRIMARY KEY (service_id, asset_id, representation_id)) |
|---|
| 250 |
INHERITS (cornea.asset)"); |
|---|
| 251 |
$dbh->do("CREATE OR REPLACE FUNCTION cornea.make_asset(in_service_id integer, in_asset_id bigint, in_repid integer, in_storage_location smallint[]) RETURNS void AS 'delete from asset where service_id = \$1 and asset_id = \$2 and representation_id = \$3; insert into asset_${tbl} (service_id, asset_id, representation_id, storage_location) VALUES (\$1, \$2, \$3, \$4);' LANGUAGE sql"); |
|---|
| 252 |
$dbh->commit(); |
|---|
| 253 |
}; |
|---|
| 254 |
if($@) { |
|---|
| 255 |
my $err = $@; |
|---|
| 256 |
eval { $dbh->rollback; }; |
|---|
| 257 |
return (-1, "already initialized") if $err =~ /already exists/; |
|---|
| 258 |
return (-1, $err); |
|---|
| 259 |
} |
|---|
| 260 |
return 0; |
|---|
| 261 |
} |
|---|
| 262 |
|
|---|
| 263 |
sub setupAssetQueue { |
|---|
| 264 |
my $self = shift; |
|---|
| 265 |
my $config = Cornea::Config->new(); |
|---|
| 266 |
my $host = shift; |
|---|
| 267 |
gethostbyname($host) || die "could not resolve $host\n";; |
|---|
| 268 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 269 |
$tbl =~ s/\..*//; |
|---|
| 270 |
my $phost = $config->get('sysinfo::nodename'); |
|---|
| 271 |
(my $ptbl = $phost) =~ s/\-/_/g; |
|---|
| 272 |
$ptbl =~ s/\..*//; |
|---|
| 273 |
my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 274 |
$config->get("DB::user"), |
|---|
| 275 |
$config->get("DB::pass"), |
|---|
| 276 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 277 |
); |
|---|
| 278 |
$dbh->begin_work(); |
|---|
| 279 |
eval { |
|---|
| 280 |
$dbh->do("CREATE TABLE cornea.asset_$tbl |
|---|
| 281 |
(CONSTRAINT asset_${tbl}_uc |
|---|
| 282 |
PRIMARY KEY (service_id, asset_id, representation_id)) |
|---|
| 283 |
INHERITS (cornea.asset)"); |
|---|
| 284 |
$dbh->do("CREATE TABLE cornea.asset_${tbl}_queue |
|---|
| 285 |
(LIKE cornea.asset |
|---|
| 286 |
EXCLUDING CONSTRAINTS |
|---|
| 287 |
EXCLUDING INDEXES)"); |
|---|
| 288 |
$dbh->do(<<SQL); |
|---|
| 289 |
CREATE FUNCTION cornea.populate_asset_${tbl}_queue() RETURNS TRIGGER |
|---|
| 290 |
AS ' |
|---|
| 291 |
DECLARE |
|---|
| 292 |
BEGIN |
|---|
| 293 |
INSERT INTO cornea.asset_${tbl}_queue |
|---|
| 294 |
(asset_id, service_id, representation_id, storage_location) |
|---|
| 295 |
VALUES (NEW.asset_id, NEW.service_id, NEW.representation_id, |
|---|
| 296 |
NEW.storage_location); |
|---|
| 297 |
RETURN NEW; |
|---|
| 298 |
END |
|---|
| 299 |
' LANGUAGE plpgsql |
|---|
| 300 |
SQL |
|---|
| 301 |
$dbh->do("CREATE TRIGGER asset_${tbl}_queue_trigger |
|---|
| 302 |
AFTER INSERT OR UPDATE ON cornea.asset_${ptbl} |
|---|
| 303 |
FOR EACH ROW |
|---|
| 304 |
EXECUTE PROCEDURE cornea.populate_asset_${tbl}_queue()"); |
|---|
| 305 |
$dbh->commit(); |
|---|
| 306 |
}; |
|---|
| 307 |
if($@) { |
|---|
| 308 |
my $err = $@; |
|---|
| 309 |
eval { $dbh->rollback; }; |
|---|
| 310 |
return (-1, "init-metanode first") |
|---|
| 311 |
if $err =~ /"cornea.asset_$ptbl" does not exist/; |
|---|
| 312 |
return (-1, "already initialized") if $err =~ /already exists/; |
|---|
| 313 |
die $err; |
|---|
| 314 |
} |
|---|
| 315 |
return 0; |
|---|
| 316 |
} |
|---|
| 317 |
|
|---|
| 318 |
sub destroyAssetQueue { |
|---|
| 319 |
my $self = shift; |
|---|
| 320 |
my $config = Cornea::Config->new(); |
|---|
| 321 |
my $host = shift; |
|---|
| 322 |
gethostbyname($host) || die "could not resolve $host\n";; |
|---|
| 323 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 324 |
$tbl =~ s/\..*//; |
|---|
| 325 |
my $phost = $config->get('sysinfo::nodename'); |
|---|
| 326 |
(my $ptbl = $phost) =~ s/\-/_/g; |
|---|
| 327 |
$ptbl =~ s/\..*//; |
|---|
| 328 |
my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 329 |
$config->get("DB::user"), |
|---|
| 330 |
$config->get("DB::pass"), |
|---|
| 331 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 332 |
); |
|---|
| 333 |
$dbh->begin_work(); |
|---|
| 334 |
eval { |
|---|
| 335 |
$dbh->do("DROP TRIGGER asset_${tbl}_queue_trigger ON cornea.asset_${ptbl}"); |
|---|
| 336 |
$dbh->do("DROP FUNCTION cornea.populate_asset_${tbl}_queue()"); |
|---|
| 337 |
$dbh->do("DROP TABLE cornea.asset_${tbl}_queue"); |
|---|
| 338 |
$dbh->do("DROP TABLE cornea.asset_$tbl"); |
|---|
| 339 |
$dbh->commit(); |
|---|
| 340 |
}; |
|---|
| 341 |
if($@) { |
|---|
| 342 |
my $err = $@; |
|---|
| 343 |
eval { $dbh->rollback; }; |
|---|
| 344 |
return (-1, "already perfomed") if $err =~ /does not exist/; |
|---|
| 345 |
return (-1, $err); |
|---|
| 346 |
} |
|---|
| 347 |
return 0; |
|---|
| 348 |
} |
|---|
| 349 |
|
|---|
| 350 |
sub initialAssetSynch { |
|---|
| 351 |
my $self = shift; |
|---|
| 352 |
my $config = Cornea::Config->new(); |
|---|
| 353 |
my $host = shift; |
|---|
| 354 |
gethostbyname($host) || die "could not resolve $host\n";; |
|---|
| 355 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 356 |
$tbl =~ s/\..*//; |
|---|
| 357 |
my $phost = $config->get('sysinfo::nodename'); |
|---|
| 358 |
(my $ptbl = $phost) =~ s/\-/_/g; |
|---|
| 359 |
$ptbl =~ s/\..*//; |
|---|
| 360 |
my $total_rows = 0; |
|---|
| 361 |
my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea", |
|---|
| 362 |
$config->get("DB::user"), |
|---|
| 363 |
$config->get("DB::pass"), |
|---|
| 364 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 365 |
); |
|---|
| 366 |
my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 367 |
$config->get("DB::user"), |
|---|
| 368 |
$config->get("DB::pass"), |
|---|
| 369 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 370 |
); |
|---|
| 371 |
$dbh->begin_work(); |
|---|
| 372 |
$ldbh->begin_work(); |
|---|
| 373 |
my $problem; |
|---|
| 374 |
eval { |
|---|
| 375 |
$problem = "peer not locally initialized"; |
|---|
| 376 |
my $isth = $ldbh->prepare( |
|---|
| 377 |
"INSERT INTO cornea.asset_${tbl} |
|---|
| 378 |
(asset_id, service_id, |
|---|
| 379 |
representation_id, storage_location) |
|---|
| 380 |
VALUES (?,?,?,?::smallint[])"); |
|---|
| 381 |
$problem = "remote not initialized"; |
|---|
| 382 |
$dbh->do("DECLARE initpull CURSOR FOR |
|---|
| 383 |
SELECT asset_id, service_id, |
|---|
| 384 |
representation_id, storage_location |
|---|
| 385 |
FROM cornea.asset_${tbl}"); |
|---|
| 386 |
$problem = "remote peer not initialized"; |
|---|
| 387 |
$dbh->do("TRUNCATE cornea.asset_${ptbl}_queue"); |
|---|
| 388 |
$problem = "error pulling remote data"; |
|---|
| 389 |
my $sth = $dbh->prepare("FETCH FORWARD 10000 FROM initpull"); |
|---|
| 390 |
my $internal_rows_moved; |
|---|
| 391 |
do { |
|---|
| 392 |
$internal_rows_moved = 0; |
|---|
| 393 |
$sth->execute(); |
|---|
| 394 |
while(my @row = $sth->fetchrow()) { |
|---|
| 395 |
$problem = "error insert local data"; |
|---|
| 396 |
$isth->execute(@row); |
|---|
| 397 |
$internal_rows_moved++; |
|---|
| 398 |
$problem = "error pulling remote data"; |
|---|
| 399 |
} |
|---|
| 400 |
$total_rows += $internal_rows_moved; |
|---|
| 401 |
} while($internal_rows_moved); |
|---|
| 402 |
$dbh->do("CLOSE initpull"); |
|---|
| 403 |
$dbh->commit(); |
|---|
| 404 |
$ldbh->commit(); |
|---|
| 405 |
}; |
|---|
| 406 |
if($@) { |
|---|
| 407 |
my $err = $@; |
|---|
| 408 |
eval { $ldbh->rollback; }; |
|---|
| 409 |
eval { $dbh->rollback; }; |
|---|
| 410 |
return (-1, "$problem:\n$err"); |
|---|
| 411 |
} |
|---|
| 412 |
return (0, "$total_rows copied"); |
|---|
| 413 |
} |
|---|
| 414 |
sub pullAssetTable { |
|---|
| 415 |
my $self = shift; |
|---|
| 416 |
my $config = Cornea::Config->new(); |
|---|
| 417 |
my $host = shift; |
|---|
| 418 |
my $timeout = shift || 1; |
|---|
| 419 |
gethostbyname($host) || die "could not resolve $host\n";; |
|---|
| 420 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 421 |
$tbl =~ s/\..*//; |
|---|
| 422 |
my $phost = $config->get('sysinfo::nodename'); |
|---|
| 423 |
(my $ptbl = $phost) =~ s/\-/_/g; |
|---|
| 424 |
$ptbl =~ s/\..*//; |
|---|
| 425 |
my $total_rows = 0; |
|---|
| 426 |
my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea", |
|---|
| 427 |
$config->get("DB::user"), |
|---|
| 428 |
$config->get("DB::pass"), |
|---|
| 429 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 430 |
); |
|---|
| 431 |
my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 432 |
$config->get("DB::user"), |
|---|
| 433 |
$config->get("DB::pass"), |
|---|
| 434 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 435 |
); |
|---|
| 436 |
while(1) { |
|---|
| 437 |
$dbh->begin_work(); |
|---|
| 438 |
$ldbh->begin_work(); |
|---|
| 439 |
eval { |
|---|
| 440 |
my $from = $dbh->prepare("DELETE FROM cornea.asset_${ptbl}_queue RETURNING *"); |
|---|
| 441 |
my $todel = $ldbh->prepare("DELETE FROM cornea.asset |
|---|
| 442 |
WHERE asset_id = ? and service_id = ? and representation_id = ?"); |
|---|
| 443 |
my $toins = $ldbh->prepare( |
|---|
| 444 |
"INSERT INTO cornea.asset_${tbl} |
|---|
| 445 |
(asset_id, service_id, |
|---|
| 446 |
representation_id, storage_location) |
|---|
| 447 |
VALUES (?,?,?,?::smallint[])"); |
|---|
| 448 |
$from->execute(); |
|---|
| 449 |
while(my @row = $from->fetchrow()) { |
|---|
| 450 |
$todel->execute(@row[0..2]); |
|---|
| 451 |
$toins->execute(@row); |
|---|
| 452 |
$total_rows++; |
|---|
| 453 |
} |
|---|
| 454 |
$ldbh->commit; |
|---|
| 455 |
$dbh->commit; |
|---|
| 456 |
}; |
|---|
| 457 |
if($@) { |
|---|
| 458 |
my $err = $@; |
|---|
| 459 |
eval { $ldbh->rollback; }; |
|---|
| 460 |
eval { $dbh->rollback; }; |
|---|
| 461 |
return (-1, "$total_rows compied.\n$err"); |
|---|
| 462 |
} |
|---|
| 463 |
sleep($timeout); |
|---|
| 464 |
} |
|---|
| 465 |
return (0, "$total_rows copied"); |
|---|
| 466 |
} |
|---|
| 467 |
1; |
|---|