| 1 |
package Cornea::RecallTable; |
|---|
| 2 |
use strict; |
|---|
| 3 |
use Cornea::Config; |
|---|
| 4 |
use Cornea::Utils; |
|---|
| 5 |
use YAML; |
|---|
| 6 |
use DBI; |
|---|
| 7 |
|
|---|
| 8 |
sub __connect { |
|---|
| 9 |
my $self = shift; |
|---|
| 10 |
my $config = Cornea::Config->new(); |
|---|
| 11 |
my $dbh; |
|---|
| 12 |
my $failed_err = undef; |
|---|
| 13 |
my $dsns = $config->get_list("DB::dsn"); |
|---|
| 14 |
Cornea::Utils::shuffle($dsns); |
|---|
| 15 |
foreach my $dsn (@$dsns) { |
|---|
| 16 |
eval { |
|---|
| 17 |
$dbh = DBI->connect($dsn, |
|---|
| 18 |
$config->get("DB::user"), |
|---|
| 19 |
$config->get("DB::pass"), |
|---|
| 20 |
{ PrintError => 0, RaiseError => 1 }, |
|---|
| 21 |
); |
|---|
| 22 |
}; |
|---|
| 23 |
die "$dsn: $@\n" if $@; |
|---|
| 24 |
last unless $@; |
|---|
| 25 |
} |
|---|
| 26 |
print STDERR "$failed_err\n" if $failed_err; |
|---|
| 27 |
$self->{dbh} = $dbh; |
|---|
| 28 |
} |
|---|
| 29 |
sub __reconnect { |
|---|
| 30 |
my $self = shift; |
|---|
| 31 |
$self->{dbh} = undef; |
|---|
| 32 |
$self->__connect(); |
|---|
| 33 |
} |
|---|
| 34 |
sub new { |
|---|
| 35 |
my $class = shift; |
|---|
| 36 |
my $self = bless { }, $class; |
|---|
| 37 |
$self; |
|---|
| 38 |
} |
|---|
| 39 |
|
|---|
| 40 |
sub insert { |
|---|
| 41 |
my $self = shift; |
|---|
| 42 |
my ($serviceId, $assetId, $repId, $snl) = @_; |
|---|
| 43 |
my $tried = 0; |
|---|
| 44 |
die 'bad parameters' unless UNIVERSAL::isa($snl, 'Cornea::StorageNodeList'); |
|---|
| 45 |
my $snl_arr = '{' . join(',', map { $_->id() } |
|---|
| 46 |
($snl->items())) . '}'; |
|---|
| 47 |
again: |
|---|
| 48 |
eval { |
|---|
| 49 |
my $sth = $self->{dbh}->prepare("select make_asset(?,?,?,?::smallint[])"); |
|---|
| 50 |
$sth->execute($serviceId, $assetId, $repId, $snl_arr); |
|---|
| 51 |
$sth->finish(); |
|---|
| 52 |
}; |
|---|
| 53 |
if ($@) { |
|---|
| 54 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 55 |
die $@ if $@; |
|---|
| 56 |
} |
|---|
| 57 |
return 1; |
|---|
| 58 |
} |
|---|
| 59 |
|
|---|
| 60 |
sub find { |
|---|
| 61 |
my $self = shift; |
|---|
| 62 |
my ($serviceId, $assetId, $repId) = @_; |
|---|
| 63 |
my $sth = $self->{dbh}->prepare("select * from get_asset_location(?,?,?)"); |
|---|
| 64 |
my $tried = 0; |
|---|
| 65 |
my $C; |
|---|
| 66 |
again: |
|---|
| 67 |
eval { |
|---|
| 68 |
$C = Cornea::StorageNodeList->new(); |
|---|
| 69 |
$sth->execute($serviceId, $assetId, $repId); |
|---|
| 70 |
while(my $node = $sth->fetchrow_hashref()) { |
|---|
| 71 |
$C->add(Cornea::StorageNode->new_from_row($node)); |
|---|
| 72 |
} |
|---|
| 73 |
$sth->finish(); |
|---|
| 74 |
}; |
|---|
| 75 |
if ($@) { |
|---|
| 76 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 77 |
die $@ if $@; |
|---|
| 78 |
} |
|---|
| 79 |
return $C; |
|---|
| 80 |
} |
|---|
| 81 |
|
|---|
| 82 |
sub getNodes { |
|---|
| 83 |
my $self = shift; |
|---|
| 84 |
my $type = shift; |
|---|
| 85 |
my $tried = 0; |
|---|
| 86 |
my $snl; |
|---|
| 87 |
my $bind = undef; |
|---|
| 88 |
if (defined($type)) { |
|---|
| 89 |
$bind = (ref $type eq 'ARRAY') ? ('{'.join(',', @$type).'}') : "{$type}"; |
|---|
| 90 |
} |
|---|
| 91 |
again: |
|---|
| 92 |
eval { |
|---|
| 93 |
$snl = Cornea::StorageNodeList->new(); |
|---|
| 94 |
my $sth = $self->{dbh}->prepare("select * from get_storage_nodes(?::storagestate[])"); |
|---|
| 95 |
$sth->execute($bind); |
|---|
| 96 |
while(my $row = $sth->fetchrow_hashref()) { |
|---|
| 97 |
$snl->add(Cornea::StorageNode->new_from_row($row)); |
|---|
| 98 |
} |
|---|
| 99 |
$sth->finish(); |
|---|
| 100 |
}; |
|---|
| 101 |
if ($@) { |
|---|
| 102 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 103 |
die $@ if $@; |
|---|
| 104 |
} |
|---|
| 105 |
return $snl; |
|---|
| 106 |
} |
|---|
| 107 |
|
|---|
| 108 |
sub _2pc_generic { |
|---|
| 109 |
my $self = shift; |
|---|
| 110 |
my $code = shift; |
|---|
| 111 |
my $config = Cornea::Config->new(); |
|---|
| 112 |
my $dsns = $config->get_list("DB::dsn"); |
|---|
| 113 |
my $rv = undef; |
|---|
| 114 |
my $named_txn = "cornea_$$"; |
|---|
| 115 |
my @dbh = map { |
|---|
| 116 |
my $dbh = DBI->connect($_, |
|---|
| 117 |
$config->get("DB::user"), |
|---|
| 118 |
$config->get("DB::pass"), |
|---|
| 119 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 } |
|---|
| 120 |
); |
|---|
| 121 |
$dbh->begin_work(); |
|---|
| 122 |
[$_, $dbh]; |
|---|
| 123 |
} @$dsns; |
|---|
| 124 |
eval { |
|---|
| 125 |
foreach (@dbh) { |
|---|
| 126 |
&$code($_->[0], $_->[1], \$rv); |
|---|
| 127 |
} |
|---|
| 128 |
foreach (@dbh) { $_->[1]->do("prepare transaction '$named_txn'"); } |
|---|
| 129 |
foreach (@dbh) { $_->[1]->do("commit prepared '$named_txn'"); } |
|---|
| 130 |
}; |
|---|
| 131 |
if ($@) { |
|---|
| 132 |
my $real_error = $@; |
|---|
| 133 |
$rv = undef; |
|---|
| 134 |
eval { foreach (@dbh) { $_->[1]->do("rollback prepared '$named_txn'"); } }; |
|---|
| 135 |
die $real_error; |
|---|
| 136 |
} |
|---|
| 137 |
foreach (@dbh) { $_->[1]->disconnect; } |
|---|
| 138 |
return $rv; |
|---|
| 139 |
} |
|---|
| 140 |
|
|---|
| 141 |
sub updateNode { |
|---|
| 142 |
my $self = shift; |
|---|
| 143 |
my $ip = shift; |
|---|
| 144 |
my $attr = shift; |
|---|
| 145 |
my $config = Cornea::Config->new(); |
|---|
| 146 |
die "bad state" |
|---|
| 147 |
unless $attr->{state} =~ /^(?:open|closed|offline|decommissioned)$/; |
|---|
| 148 |
die "storage must be a number" |
|---|
| 149 |
unless $attr->{total_storage} =~ /^[1-9]\d*$/ and |
|---|
| 150 |
$attr->{used_storage} =~ /^[1-9]\d*$/; |
|---|
| 151 |
die "locaion must be dc/cage/row/rack/pdu" |
|---|
| 152 |
unless !defined($attr->{location}) or |
|---|
| 153 |
$attr->{location} =~ /^[^\/]+(?:\/[^\/]+){4}$/; |
|---|
| 154 |
die "fqdn must not be blank" |
|---|
| 155 |
unless !defined($attr->{fqdn}) or length($attr->{fqdn}); |
|---|
| 156 |
|
|---|
| 157 |
if(defined($attr->{location}) || defined($attr->{fqdn})) { |
|---|
| 158 |
return $self->_2pc_generic(sub { |
|---|
| 159 |
eval { |
|---|
| 160 |
my $dsn = shift; |
|---|
| 161 |
my $dbh = shift; |
|---|
| 162 |
my $rv = shift; |
|---|
| 163 |
my $sth = $dbh->prepare("select set_storage_node(?,?,?,?,?,?,?)"); |
|---|
| 164 |
my $storage_node_id = $$rv; |
|---|
| 165 |
$sth->execute($attr->{state}, |
|---|
| 166 |
$attr->{total_storage}, $attr->{used_storage}, |
|---|
| 167 |
$attr->{location}, $attr->{fqdn}, $ip, $storage_node_id); |
|---|
| 168 |
my ($returned_storade_node_id) = $sth->fetchrow(); |
|---|
| 169 |
$$rv ||= $returned_storade_node_id; |
|---|
| 170 |
$sth->finish(); |
|---|
| 171 |
die "Storage node trickery! (this should never happen).\n" |
|---|
| 172 |
if($$rv != $returned_storade_node_id); |
|---|
| 173 |
}; |
|---|
| 174 |
if ($@) { |
|---|
| 175 |
die "location and fqdn must be specified for first-time update\n" |
|---|
| 176 |
if $@ =~ /null value in column "(?:location|fqdn)"/; |
|---|
| 177 |
die $@ if $@; |
|---|
| 178 |
} |
|---|
| 179 |
}); |
|---|
| 180 |
} |
|---|
| 181 |
my $dsns = $config->get_list("DB::dsn"); |
|---|
| 182 |
foreach (@$dsns) { |
|---|
| 183 |
my $dbh = DBI->connect($_, |
|---|
| 184 |
$config->get("DB::user"), |
|---|
| 185 |
$config->get("DB::pass"), |
|---|
| 186 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 } |
|---|
| 187 |
); |
|---|
| 188 |
my $tried = 0; |
|---|
| 189 |
again: |
|---|
| 190 |
eval { |
|---|
| 191 |
print STDERR "$_ -> updating node info\n" if $main::DEBUG; |
|---|
| 192 |
my $sth = $dbh->prepare("select set_storage_node(?,?,?,?,?,?,?)"); |
|---|
| 193 |
$sth->execute($attr->{state}, |
|---|
| 194 |
$attr->{total_storage}, $attr->{used_storage}, |
|---|
| 195 |
$attr->{location}, $attr->{fqdn}, $ip, undef); |
|---|
| 196 |
$sth->finish(); |
|---|
| 197 |
}; |
|---|
| 198 |
if ($@) { |
|---|
| 199 |
my $err = $@; |
|---|
| 200 |
$err = "failed: no match by IP\n" |
|---|
| 201 |
if $@ =~ /null value in column "(?:location|fqdn)"/; |
|---|
| 202 |
print STDERR "\t$@\n" if $main::DEBUG; |
|---|
| 203 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 204 |
print STDERR "$_: $err"; |
|---|
| 205 |
} |
|---|
| 206 |
} |
|---|
| 207 |
return 0; |
|---|
| 208 |
} |
|---|
| 209 |
|
|---|
| 210 |
sub updateRepInfo { |
|---|
| 211 |
my $self = shift; |
|---|
| 212 |
my $service_id = shift; |
|---|
| 213 |
my $rep_id = shift; |
|---|
| 214 |
my $attr = shift; |
|---|
| 215 |
eval "use $attr->{class};"; |
|---|
| 216 |
return (-1, "Cannot load $attr->{class}") if $@; |
|---|
| 217 |
return $self->_2pc_generic(sub { |
|---|
| 218 |
my $dsn = shift; |
|---|
| 219 |
my $dbh = shift; |
|---|
| 220 |
my $rv = shift; |
|---|
| 221 |
my $sth = $dbh->prepare("select make_representation(?,?,?,?,?,?,?)"); |
|---|
| 222 |
$sth->execute($service_id, $rep_id, $attr->{name}, |
|---|
| 223 |
$attr->{distance}, $attr->{count}, |
|---|
| 224 |
$attr->{parent}, $attr->{class}); |
|---|
| 225 |
$sth->finish(); |
|---|
| 226 |
}); |
|---|
| 227 |
} |
|---|
| 228 |
|
|---|
| 229 |
sub repInfo { |
|---|
| 230 |
my $self = shift; |
|---|
| 231 |
my ($serviceId, $repId) = @_; |
|---|
| 232 |
my $tried = 0; |
|---|
| 233 |
my $row; |
|---|
| 234 |
again: |
|---|
| 235 |
eval { |
|---|
| 236 |
my $sth = $self->{dbh}->prepare("select * from get_representation(?,?)"); |
|---|
| 237 |
$sth->execute($serviceId, $repId); |
|---|
| 238 |
$row = $sth->fetchrow_hashref(); |
|---|
| 239 |
$sth->finish(); |
|---|
| 240 |
}; |
|---|
| 241 |
if ($@) { |
|---|
| 242 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 243 |
die $@ if $@; |
|---|
| 244 |
} |
|---|
| 245 |
return $row ? Cornea::RepresentationInfo->new_from_row($row) : undef; |
|---|
| 246 |
} |
|---|
| 247 |
|
|---|
| 248 |
sub repInfoDependents { |
|---|
| 249 |
my $self = shift; |
|---|
| 250 |
my ($serviceId, $repId) = @_; |
|---|
| 251 |
my $tried = 0; |
|---|
| 252 |
my @deps; |
|---|
| 253 |
again: |
|---|
| 254 |
eval { |
|---|
| 255 |
@deps = (); |
|---|
| 256 |
my $sth = $self->{dbh}->prepare("select * from get_representation_dependents(?,?)"); |
|---|
| 257 |
$sth->execute($serviceId, $repId); |
|---|
| 258 |
while(my $row = $sth->fetchrow_hashref()) { |
|---|
| 259 |
push @deps, Cornea::RepresentationInfo->new_from_row($row); |
|---|
| 260 |
} |
|---|
| 261 |
$sth->finish(); |
|---|
| 262 |
}; |
|---|
| 263 |
if ($@) { |
|---|
| 264 |
unless ($tried++) { $self->__reconnect(); goto again; } |
|---|
| 265 |
die $@ if $@; |
|---|
| 266 |
} |
|---|
| 267 |
return @deps; |
|---|
| 268 |
} |
|---|
| 269 |
|
|---|
| 270 |
sub initAssetTable { |
|---|
| 271 |
my $self = shift; |
|---|
| 272 |
my $config = Cornea::Config->new(); |
|---|
| 273 |
my $host = $config->get('sysinfo::nodename'); |
|---|
| 274 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 275 |
$tbl =~ s/\..*//; |
|---|
| 276 |
my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 277 |
$config->get("DB::user"), |
|---|
| 278 |
$config->get("DB::pass"), |
|---|
| 279 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 280 |
); |
|---|
| 281 |
$dbh->do("set client_min_messages = 'WARNING'"); |
|---|
| 282 |
$dbh->begin_work(); |
|---|
| 283 |
eval { |
|---|
| 284 |
$dbh->do("CREATE TABLE cornea.asset_$tbl |
|---|
| 285 |
(CONSTRAINT asset_${tbl}_pkey |
|---|
| 286 |
PRIMARY KEY (service_id, asset_id, representation_id)) |
|---|
| 287 |
INHERITS (cornea.asset)"); |
|---|
| 288 |
$dbh->do("CREATE OR REPLACE FUNCTION cornea.make_asset(in_service_id integer, in_asset_id bigint, in_repid integer, in_storage_location smallint[]) RETURNS void AS 'delete from asset where service_id = \$1 and asset_id = \$2 and representation_id = \$3; insert into asset_${tbl} (service_id, asset_id, representation_id, storage_location) VALUES (\$1, \$2, \$3, \$4);' LANGUAGE sql"); |
|---|
| 289 |
$dbh->commit(); |
|---|
| 290 |
}; |
|---|
| 291 |
if($@) { |
|---|
| 292 |
my $err = $@; |
|---|
| 293 |
eval { $dbh->rollback; }; |
|---|
| 294 |
return (-1, "already initialized") if $err =~ /already exists/; |
|---|
| 295 |
return (-1, $err); |
|---|
| 296 |
} |
|---|
| 297 |
return 0; |
|---|
| 298 |
} |
|---|
| 299 |
|
|---|
| 300 |
sub setupAssetQueue { |
|---|
| 301 |
my $self = shift; |
|---|
| 302 |
my $config = Cornea::Config->new(); |
|---|
| 303 |
my $host = shift; |
|---|
| 304 |
gethostbyname($host) || die "could not resolve $host\n";; |
|---|
| 305 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 306 |
$tbl =~ s/\..*//; |
|---|
| 307 |
my $phost = $config->get('sysinfo::nodename'); |
|---|
| 308 |
(my $ptbl = $phost) =~ s/\-/_/g; |
|---|
| 309 |
$ptbl =~ s/\..*//; |
|---|
| 310 |
my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 311 |
$config->get("DB::user"), |
|---|
| 312 |
$config->get("DB::pass"), |
|---|
| 313 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 314 |
); |
|---|
| 315 |
$dbh->do("set client_min_messages = 'WARNING'"); |
|---|
| 316 |
$dbh->begin_work(); |
|---|
| 317 |
eval { |
|---|
| 318 |
$dbh->do("CREATE TABLE cornea.asset_$tbl |
|---|
| 319 |
(CONSTRAINT asset_${tbl}_uc |
|---|
| 320 |
PRIMARY KEY (service_id, asset_id, representation_id)) |
|---|
| 321 |
INHERITS (cornea.asset)"); |
|---|
| 322 |
$dbh->do("CREATE TABLE cornea.asset_${tbl}_queue |
|---|
| 323 |
(LIKE cornea.asset |
|---|
| 324 |
EXCLUDING CONSTRAINTS |
|---|
| 325 |
EXCLUDING INDEXES)"); |
|---|
| 326 |
$dbh->do(<<SQL); |
|---|
| 327 |
CREATE FUNCTION cornea.populate_asset_${tbl}_queue() RETURNS TRIGGER |
|---|
| 328 |
AS ' |
|---|
| 329 |
DECLARE |
|---|
| 330 |
BEGIN |
|---|
| 331 |
INSERT INTO cornea.asset_${tbl}_queue |
|---|
| 332 |
(asset_id, service_id, representation_id, storage_location) |
|---|
| 333 |
VALUES (NEW.asset_id, NEW.service_id, NEW.representation_id, |
|---|
| 334 |
NEW.storage_location); |
|---|
| 335 |
RETURN NEW; |
|---|
| 336 |
END |
|---|
| 337 |
' LANGUAGE plpgsql |
|---|
| 338 |
SQL |
|---|
| 339 |
$dbh->do("CREATE TRIGGER asset_${tbl}_queue_trigger |
|---|
| 340 |
AFTER INSERT OR UPDATE ON cornea.asset_${ptbl} |
|---|
| 341 |
FOR EACH ROW |
|---|
| 342 |
EXECUTE PROCEDURE cornea.populate_asset_${tbl}_queue()"); |
|---|
| 343 |
$dbh->commit(); |
|---|
| 344 |
}; |
|---|
| 345 |
if($@) { |
|---|
| 346 |
my $err = $@; |
|---|
| 347 |
eval { $dbh->rollback; }; |
|---|
| 348 |
return (-1, "init-metanode first") |
|---|
| 349 |
if $err =~ /"cornea.asset_$ptbl" does not exist/; |
|---|
| 350 |
return (-1, "already initialized") if $err =~ /already exists/; |
|---|
| 351 |
die $err; |
|---|
| 352 |
} |
|---|
| 353 |
return 0; |
|---|
| 354 |
} |
|---|
| 355 |
|
|---|
| 356 |
sub destroyAssetQueue { |
|---|
| 357 |
my $self = shift; |
|---|
| 358 |
my $config = Cornea::Config->new(); |
|---|
| 359 |
my $host = shift; |
|---|
| 360 |
gethostbyname($host) || die "could not resolve $host\n";; |
|---|
| 361 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 362 |
$tbl =~ s/\..*//; |
|---|
| 363 |
my $phost = $config->get('sysinfo::nodename'); |
|---|
| 364 |
(my $ptbl = $phost) =~ s/\-/_/g; |
|---|
| 365 |
$ptbl =~ s/\..*//; |
|---|
| 366 |
my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 367 |
$config->get("DB::user"), |
|---|
| 368 |
$config->get("DB::pass"), |
|---|
| 369 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 370 |
); |
|---|
| 371 |
$dbh->begin_work(); |
|---|
| 372 |
eval { |
|---|
| 373 |
$dbh->do("DROP TRIGGER asset_${tbl}_queue_trigger ON cornea.asset_${ptbl}"); |
|---|
| 374 |
$dbh->do("DROP FUNCTION cornea.populate_asset_${tbl}_queue()"); |
|---|
| 375 |
$dbh->do("DROP TABLE cornea.asset_${tbl}_queue"); |
|---|
| 376 |
$dbh->do("DROP TABLE cornea.asset_$tbl"); |
|---|
| 377 |
$dbh->commit(); |
|---|
| 378 |
}; |
|---|
| 379 |
if($@) { |
|---|
| 380 |
my $err = $@; |
|---|
| 381 |
eval { $dbh->rollback; }; |
|---|
| 382 |
return (-1, "already perfomed") if $err =~ /does not exist/; |
|---|
| 383 |
return (-1, $err); |
|---|
| 384 |
} |
|---|
| 385 |
return 0; |
|---|
| 386 |
} |
|---|
| 387 |
|
|---|
| 388 |
sub initialAssetSynch { |
|---|
| 389 |
my $self = shift; |
|---|
| 390 |
my $config = Cornea::Config->new(); |
|---|
| 391 |
my $host = shift; |
|---|
| 392 |
gethostbyname($host) || die "could not resolve $host\n";; |
|---|
| 393 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 394 |
$tbl =~ s/\..*//; |
|---|
| 395 |
my $phost = $config->get('sysinfo::nodename'); |
|---|
| 396 |
(my $ptbl = $phost) =~ s/\-/_/g; |
|---|
| 397 |
$ptbl =~ s/\..*//; |
|---|
| 398 |
my $total_rows = 0; |
|---|
| 399 |
my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea", |
|---|
| 400 |
$config->get("DB::user"), |
|---|
| 401 |
$config->get("DB::pass"), |
|---|
| 402 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 403 |
); |
|---|
| 404 |
my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 405 |
$config->get("DB::user"), |
|---|
| 406 |
$config->get("DB::pass"), |
|---|
| 407 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 408 |
); |
|---|
| 409 |
$dbh->begin_work(); |
|---|
| 410 |
$ldbh->begin_work(); |
|---|
| 411 |
my $problem; |
|---|
| 412 |
eval { |
|---|
| 413 |
$problem = "peer not locally initialized"; |
|---|
| 414 |
my $isth = $ldbh->prepare( |
|---|
| 415 |
"INSERT INTO cornea.asset_${tbl} |
|---|
| 416 |
(asset_id, service_id, |
|---|
| 417 |
representation_id, storage_location) |
|---|
| 418 |
VALUES (?,?,?,?::smallint[])"); |
|---|
| 419 |
$problem = "remote not initialized"; |
|---|
| 420 |
$dbh->do("DECLARE initpull CURSOR FOR |
|---|
| 421 |
SELECT asset_id, service_id, |
|---|
| 422 |
representation_id, storage_location |
|---|
| 423 |
FROM cornea.asset_${tbl}"); |
|---|
| 424 |
$problem = "remote peer not initialized"; |
|---|
| 425 |
$dbh->do("TRUNCATE cornea.asset_${ptbl}_queue"); |
|---|
| 426 |
$problem = "error pulling remote data"; |
|---|
| 427 |
my $sth = $dbh->prepare("FETCH FORWARD 10000 FROM initpull"); |
|---|
| 428 |
my $internal_rows_moved; |
|---|
| 429 |
do { |
|---|
| 430 |
$internal_rows_moved = 0; |
|---|
| 431 |
$sth->execute(); |
|---|
| 432 |
while(my @row = $sth->fetchrow()) { |
|---|
| 433 |
$problem = "error insert local data"; |
|---|
| 434 |
$isth->execute(@row); |
|---|
| 435 |
$internal_rows_moved++; |
|---|
| 436 |
$problem = "error pulling remote data"; |
|---|
| 437 |
} |
|---|
| 438 |
$total_rows += $internal_rows_moved; |
|---|
| 439 |
} while($internal_rows_moved); |
|---|
| 440 |
$dbh->do("CLOSE initpull"); |
|---|
| 441 |
$dbh->commit(); |
|---|
| 442 |
$ldbh->commit(); |
|---|
| 443 |
}; |
|---|
| 444 |
if($@) { |
|---|
| 445 |
my $err = $@; |
|---|
| 446 |
eval { $ldbh->rollback; }; |
|---|
| 447 |
eval { $dbh->rollback; }; |
|---|
| 448 |
return (-1, "$problem:\n$err"); |
|---|
| 449 |
} |
|---|
| 450 |
return (0, "$total_rows copied"); |
|---|
| 451 |
} |
|---|
| 452 |
sub pullAssetTable { |
|---|
| 453 |
my $self = shift; |
|---|
| 454 |
my $config = Cornea::Config->new(); |
|---|
| 455 |
my $host = shift; |
|---|
| 456 |
my $timeout = shift || 1; |
|---|
| 457 |
gethostbyname($host) || die "could not resolve $host\n";; |
|---|
| 458 |
(my $tbl = $host) =~ s/\-/_/g; |
|---|
| 459 |
$tbl =~ s/\..*//; |
|---|
| 460 |
my $phost = $config->get('sysinfo::nodename'); |
|---|
| 461 |
(my $ptbl = $phost) =~ s/\-/_/g; |
|---|
| 462 |
$ptbl =~ s/\..*//; |
|---|
| 463 |
my $total_rows = 0; |
|---|
| 464 |
my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea", |
|---|
| 465 |
$config->get("DB::user"), |
|---|
| 466 |
$config->get("DB::pass"), |
|---|
| 467 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 468 |
); |
|---|
| 469 |
my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea", |
|---|
| 470 |
$config->get("DB::user"), |
|---|
| 471 |
$config->get("DB::pass"), |
|---|
| 472 |
{ PrintError => 0, RaiseError => 1, AutoCommit => 1 }, |
|---|
| 473 |
); |
|---|
| 474 |
while(1) { |
|---|
| 475 |
$dbh->begin_work(); |
|---|
| 476 |
$ldbh->begin_work(); |
|---|
| 477 |
eval { |
|---|
| 478 |
my $from = $dbh->prepare("DELETE FROM cornea.asset_${ptbl}_queue RETURNING *"); |
|---|
| 479 |
my $todel = $ldbh->prepare("DELETE FROM cornea.asset |
|---|
| 480 |
WHERE asset_id = ? and service_id = ? and representation_id = ?"); |
|---|
| 481 |
my $toins = $ldbh->prepare( |
|---|
| 482 |
"INSERT INTO cornea.asset_${tbl} |
|---|
| 483 |
(asset_id, service_id, |
|---|
| 484 |
representation_id, storage_location) |
|---|
| 485 |
VALUES (?,?,?,?::smallint[])"); |
|---|
| 486 |
$from->execute(); |
|---|
| 487 |
while(my @row = $from->fetchrow()) { |
|---|
| 488 |
$todel->execute(@row[0..2]); |
|---|
| 489 |
$toins->execute(@row); |
|---|
| 490 |
$total_rows++; |
|---|
| 491 |
} |
|---|
| 492 |
$ldbh->commit; |
|---|
| 493 |
$dbh->commit; |
|---|
| 494 |
}; |
|---|
| 495 |
if($@) { |
|---|
| 496 |
my $err = $@; |
|---|
| 497 |
eval { $ldbh->rollback; }; |
|---|
| 498 |
eval { $dbh->rollback; }; |
|---|
| 499 |
return (-1, "$total_rows compied.\n$err"); |
|---|
| 500 |
} |
|---|
| 501 |
sleep($timeout); |
|---|
| 502 |
} |
|---|
| 503 |
return (0, "$total_rows copied"); |
|---|
| 504 |
} |
|---|
| 505 |
|
|---|
| 506 |
sub listAssetTables { |
|---|
| 507 |
my $self = shift; |
|---|
| 508 |
$self->_2pc_generic(sub { |
|---|
| 509 |
my $dsn = shift; |
|---|
| 510 |
my $dbh = shift; |
|---|
| 511 |
my $rv = shift; |
|---|
| 512 |
$$rv ||= {}; |
|---|
| 513 |
my $sth = $dbh->prepare(<<SQL); |
|---|
| 514 |
|
|---|
| 515 |
select relname, (case when pg_trigger.oid is null |
|---|
| 516 |
then 'remote' |
|---|
| 517 |
else 'master' end) as partition_type |
|---|
| 518 |
from pg_class |
|---|
| 519 |
join (select inhrelid |
|---|
| 520 |
from pg_inherits |
|---|
| 521 |
where inhparent in (select oid |
|---|
| 522 |
from pg_class |
|---|
| 523 |
where relname='asset' |
|---|
| 524 |
and relnamespace in (select oid |
|---|
| 525 |
from pg_namespace |
|---|
| 526 |
where nspname = 'cornea'))) as c |
|---|
| 527 |
on (pg_class.oid = inhrelid) |
|---|
| 528 |
left join pg_trigger |
|---|
| 529 |
on (c.inhrelid = pg_trigger.tgrelid) |
|---|
| 530 |
|
|---|
| 531 |
SQL |
|---|
| 532 |
$sth->execute(); |
|---|
| 533 |
my $master; |
|---|
| 534 |
my $slaves = {}; |
|---|
| 535 |
while(my @row = $sth->fetchrow()) { |
|---|
| 536 |
if ($row[1] eq 'master') { |
|---|
| 537 |
$master = $row[0]; |
|---|
| 538 |
} |
|---|
| 539 |
else { |
|---|
| 540 |
my $qsize = $dbh->prepare("select count(*) |
|---|
| 541 |
from cornea.$row[0]_queue"); |
|---|
| 542 |
$qsize->execute(); |
|---|
| 543 |
my ($qsize_result) = $qsize->fetchrow(); |
|---|
| 544 |
$qsize->finish; |
|---|
| 545 |
$slaves->{$row[0]} = "$qsize_result rows behind"; |
|---|
| 546 |
} |
|---|
| 547 |
} |
|---|
| 548 |
${$rv}->{$dsn} = { master => $master, slaves => $slaves }; |
|---|
| 549 |
}); |
|---|
| 550 |
} |
|---|
| 551 |
1; |
|---|