root/trunk/perl/lib/Cornea/RecallTable.pm

Revision 46, 17.0 kB (checked in by jesus, 5 years ago)

various fixes

Line 
1 package Cornea::RecallTable;
2 use strict;
3 use Cornea::Config;
4 use Cornea::Utils;
5 use YAML;
6 use DBI;
7
8 sub __connect {
9   my $self = shift;
10   my $config = Cornea::Config->new();
11   my $dbh;
12   my $failed_err = undef;
13   my $dsns = $config->get_list("DB::dsn");
14   Cornea::Utils::shuffle($dsns);
15   foreach my $dsn (@$dsns) {
16     eval {
17       $dbh = DBI->connect($dsn,
18                           $config->get("DB::user"),
19                           $config->get("DB::pass"),
20                           { PrintError => 0, RaiseError => 1 },
21                          );
22     };
23     die "$dsn: $@\n" if $@;
24     last unless $@;
25   }
26   print STDERR "$failed_err\n" if $failed_err;
27   $self->{dbh} = $dbh;
28 }
29 sub __reconnect {
30   my $self = shift;
31   $self->{dbh} = undef;
32   $self->__connect();
33 }
34 sub new {
35   my $class = shift;
36   my $self = bless { }, $class;
37   $self;
38 }
39
40 sub insert {
41   my $self = shift;
42   my ($serviceId, $assetId, $repId, $snl) = @_;
43   my $tried = 0;
44   die 'bad parameters' unless UNIVERSAL::isa($snl, 'Cornea::StorageNodeList');
45   my $snl_arr = '{' . join(',', map { $_->id() }
46                                     ($snl->items())) . '}';
47  again:
48   eval {
49     my $sth = $self->{dbh}->prepare("select make_asset(?,?,?,?::smallint[])");
50     $sth->execute($serviceId, $assetId, $repId, $snl_arr);
51     $sth->finish();
52   };
53   if ($@) {
54     unless ($tried++) { $self->__reconnect();  goto again; }
55     die $@ if $@;
56   }
57   return 1;
58 }
59
60 sub find {
61   my $self = shift;
62   my ($serviceId, $assetId, $repId) = @_;
63   my $sth = $self->{dbh}->prepare("select * from get_asset_location(?,?,?)");
64   my $tried = 0;
65   my $C;
66  again:
67   eval {
68     $C = Cornea::StorageNodeList->new();
69     $sth->execute($serviceId, $assetId, $repId);
70     while(my $node = $sth->fetchrow_hashref()) {
71       $C->add(Cornea::StorageNode->new_from_row($node));
72     }
73     $sth->finish();
74   };
75   if ($@) {
76     unless ($tried++) { $self->__reconnect();  goto again; }
77     die $@ if $@;
78   }
79   return $C;
80 }
81
82 sub getNodes {
83   my $self = shift;
84   my $type = shift;
85   my $tried = 0;
86   my $snl;
87   my $bind = undef;
88   if (defined($type)) {
89     $bind = (ref $type eq 'ARRAY') ? ('{'.join(',', @$type).'}') : "{$type}";
90   }
91  again:
92   eval {
93     $snl = Cornea::StorageNodeList->new();
94     my $sth = $self->{dbh}->prepare("select * from get_storage_nodes(?::storagestate[])");
95     $sth->execute($bind);
96     while(my $row = $sth->fetchrow_hashref()) {
97       $snl->add(Cornea::StorageNode->new_from_row($row));
98     }
99     $sth->finish();
100   };
101   if ($@) {
102     unless ($tried++) { $self->__reconnect();  goto again; }
103     die $@ if $@;
104   }
105   return $snl;
106 }
107
108 sub _2pc_generic {
109   my $self = shift;
110   my $code = shift;
111   my $config = Cornea::Config->new();
112   my $dsns = $config->get_list("DB::dsn");
113   my $rv = undef;
114   my $named_txn = "cornea_$$";
115   my @dbh = map {
116     my $dbh = DBI->connect($_,
117                  $config->get("DB::user"),
118                  $config->get("DB::pass"),
119                  { PrintError => 0, RaiseError => 1, AutoCommit => 1 }
120                 );
121     $dbh->begin_work();
122     [$_, $dbh];
123   } @$dsns;
124   eval {
125     foreach (@dbh) {
126       &$code($_->[0], $_->[1], \$rv);
127     }
128     foreach (@dbh) { $_->[1]->do("prepare transaction '$named_txn'"); }
129     foreach (@dbh) { $_->[1]->do("commit prepared '$named_txn'"); }
130   };
131   if ($@) {
132     my $real_error = $@;
133     $rv = undef;
134     eval { foreach (@dbh) { $_->[1]->do("rollback prepared '$named_txn'"); } };
135     die $real_error;
136   }
137   foreach (@dbh) { $_->[1]->disconnect; }
138   return $rv;
139 }
140      
141 sub updateNode {
142   my $self = shift;
143   my $ip = shift;
144   my $attr = shift;
145   my $config = Cornea::Config->new();
146   die "bad state"
147     unless $attr->{state} =~ /^(?:open|closed|offline|decommissioned)$/;
148   die "storage must be a number"
149     unless $attr->{total_storage} =~ /^[1-9]\d*$/ and
150            $attr->{used_storage} =~ /^[1-9]\d*$/;
151   die "locaion must be dc/cage/row/rack/pdu"
152     unless !defined($attr->{location}) or
153            $attr->{location} =~ /^[^\/]+(?:\/[^\/]+){4}$/;
154   die "fqdn must not be blank"
155     unless !defined($attr->{fqdn}) or length($attr->{fqdn});
156
157   if(defined($attr->{location}) || defined($attr->{fqdn})) {
158     return $self->_2pc_generic(sub {
159       eval {
160         my $dsn = shift;
161         my $dbh = shift;
162         my $rv = shift;
163         my $sth = $dbh->prepare("select set_storage_node(?,?,?,?,?,?,?)");
164         my $storage_node_id = $$rv;
165         $sth->execute($attr->{state},
166                       $attr->{total_storage}, $attr->{used_storage},
167                       $attr->{location}, $attr->{fqdn}, $ip, $storage_node_id);
168         my ($returned_storade_node_id) = $sth->fetchrow();
169         $$rv ||= $returned_storade_node_id;
170         $sth->finish();
171         die "Storage node trickery! (this should never happen).\n"
172           if($$rv != $returned_storade_node_id);
173       };
174       if ($@) {
175         die "location and fqdn must be specified for first-time update\n"
176           if $@ =~ /null value in column "(?:location|fqdn)"/;
177         die $@ if $@;
178       }
179     });
180   }
181   my $dsns = $config->get_list("DB::dsn");
182   foreach (@$dsns) {
183     my $dbh = DBI->connect($_,
184                  $config->get("DB::user"),
185                  $config->get("DB::pass"),
186                  { PrintError => 0, RaiseError => 1, AutoCommit => 1 }
187                 );
188     my $tried = 0;
189    again:
190     eval {
191       print STDERR "$_ -> updating node info\n" if $main::DEBUG;
192       my $sth = $dbh->prepare("select set_storage_node(?,?,?,?,?,?,?)");
193       $sth->execute($attr->{state},
194                     $attr->{total_storage}, $attr->{used_storage},
195                     $attr->{location}, $attr->{fqdn}, $ip, undef);
196       $sth->finish();
197     };
198     if ($@) {
199       my $err = $@;
200       $err = "failed: no match by IP\n"
201         if $@ =~ /null value in column "(?:location|fqdn)"/;
202       print STDERR "\t$@\n" if $main::DEBUG;
203       unless ($tried++) { $self->__reconnect();  goto again; }
204       print STDERR "$_: $err";
205     }
206   }
207   return 0;
208 }
209
210 sub updateRepInfo {
211   my $self = shift;
212   my $service_id = shift;
213   my $rep_id = shift;
214   my $attr = shift;
215   eval "use $attr->{class};";
216   return (-1, "Cannot load $attr->{class}") if $@;
217   return $self->_2pc_generic(sub {
218     my $dsn = shift;
219     my $dbh = shift;
220     my $rv = shift;
221     my $sth = $dbh->prepare("select make_representation(?,?,?,?,?,?,?)");
222     $sth->execute($service_id, $rep_id, $attr->{name},
223                   $attr->{distance}, $attr->{count},
224                   $attr->{parent}, $attr->{class});
225     $sth->finish();
226   });
227 }
228
229 sub repInfo {
230   my $self = shift;
231   my ($serviceId, $repId) = @_;
232   my $tried = 0;
233   my $row;
234  again:
235   eval {
236     my $sth = $self->{dbh}->prepare("select * from get_representation(?,?)");
237     $sth->execute($serviceId, $repId);
238     $row = $sth->fetchrow_hashref();
239     $sth->finish();
240   };
241   if ($@) {
242     unless ($tried++) { $self->__reconnect();  goto again; }
243     die $@ if $@;
244   }
245   return $row ? Cornea::RepresentationInfo->new_from_row($row) : undef;
246 }
247
248 sub repInfoDependents {
249   my $self = shift;
250   my ($serviceId, $repId) = @_;
251   my $tried = 0;
252   my @deps;
253  again:
254   eval {
255     @deps = ();
256     my $sth = $self->{dbh}->prepare("select * from get_representation_dependents(?,?)");
257     $sth->execute($serviceId, $repId);
258     while(my $row = $sth->fetchrow_hashref()) {
259       push @deps, Cornea::RepresentationInfo->new_from_row($row);
260     }
261     $sth->finish();
262   };
263   if ($@) {
264     unless ($tried++) { $self->__reconnect();  goto again; }
265     die $@ if $@;
266   }
267   return @deps;
268 }
269
270 sub initAssetTable {
271   my $self = shift;
272   my $config = Cornea::Config->new();
273   my $host = $config->get('sysinfo::nodename');
274   (my $tbl = $host) =~ s/\-/_/g;
275   $tbl =~ s/\..*//;
276   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
277                          $config->get("DB::user"),
278                          $config->get("DB::pass"),
279                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
280                         );
281   $dbh->do("set client_min_messages = 'WARNING'");
282   $dbh->begin_work();
283   eval {
284     $dbh->do("CREATE TABLE cornea.asset_$tbl
285                     (CONSTRAINT asset_${tbl}_pkey
286                         PRIMARY KEY (service_id, asset_id, representation_id))
287                      INHERITS (cornea.asset)");
288     $dbh->do("CREATE OR REPLACE FUNCTION cornea.make_asset(in_service_id integer, in_asset_id bigint, in_repid integer, in_storage_location smallint[]) RETURNS void AS 'delete from asset where service_id = \$1 and asset_id = \$2 and representation_id = \$3; insert into asset_${tbl} (service_id, asset_id, representation_id, storage_location) VALUES (\$1, \$2, \$3, \$4);' LANGUAGE sql");
289     $dbh->commit();
290   };
291   if($@) {
292     my $err = $@;
293     eval { $dbh->rollback; };
294     return (-1, "already initialized") if $err =~ /already exists/;
295     return (-1, $err);
296   }
297   return 0;
298 }
299
300 sub setupAssetQueue {
301   my $self = shift;
302   my $config = Cornea::Config->new();
303   my $host = shift;
304   gethostbyname($host) || die "could not resolve $host\n";;
305   (my $tbl = $host) =~ s/\-/_/g;
306   $tbl =~ s/\..*//;
307   my $phost = $config->get('sysinfo::nodename');
308   (my $ptbl = $phost) =~ s/\-/_/g;
309   $ptbl =~ s/\..*//;
310   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
311                          $config->get("DB::user"),
312                          $config->get("DB::pass"),
313                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
314                         );
315   $dbh->do("set client_min_messages = 'WARNING'");
316   $dbh->begin_work();
317   eval {
318     $dbh->do("CREATE TABLE cornea.asset_$tbl
319                     (CONSTRAINT asset_${tbl}_uc
320                         PRIMARY KEY (service_id, asset_id, representation_id))
321                      INHERITS (cornea.asset)");
322     $dbh->do("CREATE TABLE cornea.asset_${tbl}_queue
323                      (LIKE cornea.asset
324                       EXCLUDING CONSTRAINTS
325                       EXCLUDING INDEXES)");
326     $dbh->do(<<SQL);
327 CREATE FUNCTION cornea.populate_asset_${tbl}_queue() RETURNS TRIGGER
328   AS '
329 DECLARE
330 BEGIN
331   INSERT INTO cornea.asset_${tbl}_queue
332               (asset_id, service_id, representation_id, storage_location)
333        VALUES (NEW.asset_id, NEW.service_id, NEW.representation_id,
334                NEW.storage_location);
335   RETURN NEW;
336 END
337 ' LANGUAGE plpgsql
338 SQL
339     $dbh->do("CREATE TRIGGER asset_${tbl}_queue_trigger
340                 AFTER INSERT OR UPDATE ON cornea.asset_${ptbl}
341                 FOR EACH ROW
342                 EXECUTE PROCEDURE cornea.populate_asset_${tbl}_queue()");
343     $dbh->commit();
344   };
345   if($@) {
346     my $err = $@;
347     eval { $dbh->rollback; };
348     return (-1, "init-metanode first")
349       if $err =~ /"cornea.asset_$ptbl" does not exist/;
350     return (-1, "already initialized") if $err =~ /already exists/;
351     die $err;
352   }
353   return 0;
354 }
355
356 sub destroyAssetQueue {
357   my $self = shift;
358   my $config = Cornea::Config->new();
359   my $host = shift;
360   gethostbyname($host) || die "could not resolve $host\n";;
361   (my $tbl = $host) =~ s/\-/_/g;
362   $tbl =~ s/\..*//;
363   my $phost = $config->get('sysinfo::nodename');
364   (my $ptbl = $phost) =~ s/\-/_/g;
365   $ptbl =~ s/\..*//;
366   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
367                          $config->get("DB::user"),
368                          $config->get("DB::pass"),
369                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
370                         );
371   $dbh->begin_work();
372   eval {
373     $dbh->do("DROP TRIGGER asset_${tbl}_queue_trigger ON cornea.asset_${ptbl}");
374     $dbh->do("DROP FUNCTION cornea.populate_asset_${tbl}_queue()");
375     $dbh->do("DROP TABLE cornea.asset_${tbl}_queue");
376     $dbh->do("DROP TABLE cornea.asset_$tbl");
377     $dbh->commit();
378   };
379   if($@) {
380     my $err = $@;
381     eval { $dbh->rollback; };
382     return (-1, "already perfomed") if $err =~ /does not exist/;
383     return (-1, $err);
384   }
385   return 0;
386 }
387
388 sub initialAssetSynch {
389   my $self = shift;
390   my $config = Cornea::Config->new();
391   my $host = shift;
392   gethostbyname($host) || die "could not resolve $host\n";;
393   (my $tbl = $host) =~ s/\-/_/g;
394   $tbl =~ s/\..*//;
395   my $phost = $config->get('sysinfo::nodename');
396   (my $ptbl = $phost) =~ s/\-/_/g;
397   $ptbl =~ s/\..*//;
398   my $total_rows = 0;
399   my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea",
400                          $config->get("DB::user"),
401                          $config->get("DB::pass"),
402                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
403                         );
404   my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
405                          $config->get("DB::user"),
406                          $config->get("DB::pass"),
407                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
408                         );
409   $dbh->begin_work();
410   $ldbh->begin_work();
411   my $problem;
412   eval {
413     $problem = "peer not locally initialized";
414     my $isth = $ldbh->prepare(
415         "INSERT INTO cornea.asset_${tbl}
416                     (asset_id, service_id,
417                      representation_id, storage_location)
418               VALUES (?,?,?,?::smallint[])");
419     $problem = "remote not initialized";
420     $dbh->do("DECLARE initpull CURSOR FOR
421                SELECT asset_id, service_id,
422                       representation_id, storage_location
423                  FROM cornea.asset_${tbl}");
424     $problem = "remote peer not initialized";
425     $dbh->do("TRUNCATE cornea.asset_${ptbl}_queue");
426     $problem = "error pulling remote data";
427     my $sth = $dbh->prepare("FETCH FORWARD 10000 FROM initpull");
428     my $internal_rows_moved;
429     do {
430       $internal_rows_moved = 0;
431       $sth->execute();
432       while(my @row = $sth->fetchrow()) {
433         $problem = "error insert local data";
434         $isth->execute(@row);
435         $internal_rows_moved++;
436         $problem = "error pulling remote data";
437       }
438       $total_rows += $internal_rows_moved;
439     } while($internal_rows_moved);
440     $dbh->do("CLOSE initpull");
441     $dbh->commit();
442     $ldbh->commit();
443   };
444   if($@) {
445     my $err = $@;
446     eval { $ldbh->rollback; };
447     eval { $dbh->rollback; };
448     return (-1, "$problem:\n$err");
449   }
450   return (0, "$total_rows copied");
451 }
452 sub pullAssetTable {
453   my $self = shift;
454   my $config = Cornea::Config->new();
455   my $host = shift;
456   my $timeout = shift || 1;
457   gethostbyname($host) || die "could not resolve $host\n";;
458   (my $tbl = $host) =~ s/\-/_/g;
459   $tbl =~ s/\..*//;
460   my $phost = $config->get('sysinfo::nodename');
461   (my $ptbl = $phost) =~ s/\-/_/g;
462   $ptbl =~ s/\..*//;
463   my $total_rows = 0;
464   my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea",
465                          $config->get("DB::user"),
466                          $config->get("DB::pass"),
467                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
468                         );
469   my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
470                          $config->get("DB::user"),
471                          $config->get("DB::pass"),
472                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
473                         );
474   while(1) {
475     $dbh->begin_work();
476     $ldbh->begin_work();
477     eval {
478       my $from = $dbh->prepare("DELETE FROM cornea.asset_${ptbl}_queue RETURNING *");
479       my $todel = $ldbh->prepare("DELETE FROM cornea.asset
480                  WHERE asset_id = ? and service_id = ? and representation_id = ?");
481       my $toins = $ldbh->prepare(
482           "INSERT INTO cornea.asset_${tbl}
483                       (asset_id, service_id,
484                        representation_id, storage_location)
485                 VALUES (?,?,?,?::smallint[])");
486       $from->execute();
487       while(my @row = $from->fetchrow()) {
488         $todel->execute(@row[0..2]);
489         $toins->execute(@row);
490         $total_rows++;
491       }
492       $ldbh->commit;
493       $dbh->commit;
494     };
495     if($@) {
496       my $err = $@;
497       eval { $ldbh->rollback; };
498       eval { $dbh->rollback; };
499       return (-1, "$total_rows compied.\n$err");
500     }
501     sleep($timeout);
502   }
503   return (0, "$total_rows copied");
504 }
505
506 sub listAssetTables {
507   my $self = shift;
508   $self->_2pc_generic(sub {
509     my $dsn = shift;
510     my $dbh = shift;
511     my $rv = shift;
512     $$rv ||= {};
513     my $sth = $dbh->prepare(<<SQL);
514
515     select relname, (case when pg_trigger.oid is null
516                           then 'remote'
517                           else 'master' end) as partition_type
518       from pg_class
519       join (select inhrelid
520               from pg_inherits
521              where inhparent in (select oid
522                                    from pg_class
523                                   where relname='asset'
524                                     and relnamespace in (select oid
525                                                            from pg_namespace
526                                                           where nspname = 'cornea'))) as c
527         on (pg_class.oid = inhrelid)
528  left join pg_trigger
529         on (c.inhrelid = pg_trigger.tgrelid)
530
531 SQL
532     $sth->execute();
533     my $master;
534     my $slaves = {};
535     while(my @row = $sth->fetchrow()) {
536       if ($row[1] eq 'master') {
537         $master = $row[0];
538       }
539       else {
540         my $qsize = $dbh->prepare("select count(*)
541                                     from cornea.$row[0]_queue");
542         $qsize->execute();
543         my ($qsize_result) = $qsize->fetchrow();
544         $qsize->finish;
545         $slaves->{$row[0]} = "$qsize_result rows behind";
546       }
547     }
548     ${$rv}->{$dsn} = { master => $master, slaves => $slaves };
549   });
550 }
551 1;
Note: See TracBrowser for help on using the browser.