root/trunk/perl/lib/Cornea/RecallTable.pm

Revision 24, 14.5 kB (checked in by jesus, 5 years ago)

fix replication to support updates and 'remastering' of asset metadata

Line 
1 package Cornea::RecallTable;
2 use strict;
3 use Cornea::Config;
4 use Cornea::Utils;
5 use DBI;
6
7 sub __connect {
8   my $self = shift;
9   my $config = Cornea::Config->new();
10   my $dbh;
11   my $failed_err = undef;
12   my $dsns = $config->get_list("DB::dsn");
13   Cornea::Utils::shuffle($dsns);
14   foreach my $dsn (@$dsns) {
15     eval {
16       $dbh = DBI->connect($dsn,
17                           $config->get("DB::user"),
18                           $config->get("DB::pass"),
19                           { PrintError => 0, RaiseError => 1 },
20                          );
21     };
22     die "$dsn: $@\n" if $@;
23     last unless $@;
24   }
25   print STDERR "$failed_err\n" if $failed_err;
26   $self->{dbh} = $dbh;
27 }
28 sub __reconnect {
29   my $self = shift;
30   $self->{dbh} = undef;
31   $self->__connect();
32 }
33 sub new {
34   my $class = shift;
35   my $self = bless { }, $class;
36   $self;
37 }
38
39 sub insert {
40   my $self = shift;
41   my ($serviceId, $assetId, $repId, $snl) = @_;
42   my $tried = 0;
43   die 'bad parameters' unless UNIVERSAL::ISA($snl, 'Cornea::StorageNodeList');
44   my $snl_arr = '{' . join(',', map { $_->storeagenodeid() }
45                                     ($snl->items())) . '}';
46  again:
47   eval {
48     my $sth = $self->{dbh}->prepare("select make_asset(?,?,?,?::int[])");
49     $sth->execute($serviceId, $assetId, $repId, $snl_arr);
50     $sth->finish();
51   };
52   if ($@) {
53     unless ($tried++) { $self->__reconnect();  goto again; }
54     die $@ if $@;
55   }
56   return 1;
57 }
58
59 sub find {
60   my $self = shift;
61   my ($serviceId, $assetId, $repId) = @_;
62   my $sth = $self->{dbh}->prepare("select get_asset_location(?,?,?)");
63   my $tried = 0;
64   my $C;
65  again:
66   eval {
67     $C = Cornea::StorageNodeList->new();
68     $sth->execute($serviceId, $assetId, $repId);
69     while(my $node = $sth->fetchrow_hashref()) {
70       $C->add(Cornea::StorageNode->new_from_row($node));
71     }
72     $sth->finish();
73   };
74   if ($@) {
75     unless ($tried++) { $self->__reconnect();  goto again; }
76     die $@ if $@;
77   }
78   return $C;
79 }
80
81 sub getNodes {
82   my $self = shift;
83   my $type = shift;
84   my $tried = 0;
85   my $snl;
86  again:
87   eval {
88     $snl = Cornea::StorageNodeList->new();
89     my $sth = $self->{dbh}->prepare("select * from get_storage_nodes_by_state(?)");
90     $sth->execute($type);
91     while(my $row = $sth->fetchrow_hashref()) {
92       $snl->add(Cornea::StorageNode->new_from_row($row));
93     }
94     $sth->finish();
95   };
96   if ($@) {
97     unless ($tried++) { $self->__reconnect();  goto again; }
98     die $@ if $@;
99   }
100   return $snl;
101 }
102
103 sub _2pc_add_storage {
104   my $ip = shift;
105   my $attr = shift;
106   my $storage_node_id;
107   my $config = Cornea::Config->new();
108   my $dsns = $config->get_list("DB::dsn");
109   my @dbh = map {
110     my $dbh = DBI->connect($_,
111                  $config->get("DB::user"),
112                  $config->get("DB::pass"),
113                  { PrintError => 0, RaiseError => 1, AutoCommit => 1 }
114                 );
115     $dbh->begin_work();
116     $dbh;
117   } @$dsns;
118   eval {
119     foreach (@dbh) {
120       eval {
121         my $sth = $_->prepare("select set_storage_node(?,?,?,?,?,?,?)");
122         $sth->execute($attr->{state},
123                       $attr->{total_storage}, $attr->{used_storage},
124                       $attr->{location}, $attr->{fqdn}, $ip, $storage_node_id);
125         my ($returned_storade_node_id) = $sth->fetchrow();
126         $storage_node_id ||= $returned_storade_node_id;
127         $sth->finish();
128         die "Storage node trickery! (this should never happen).\n"
129           if($storage_node_id != $returned_storade_node_id);
130       };
131       if ($@) {
132         die "location must be specified for first-time update\n"
133           if $@ =~ /null value in column "location"/;
134         die $@ if $@;
135       }
136     }
137     foreach (@dbh) { $_->do("prepare transaction 'cornea_node'"); }
138     foreach (@dbh) { $_->do("commit prepared 'cornea_node'"); }
139   };
140   if ($@) {
141     my $real_error = $@;
142     $storage_node_id = undef;
143     eval { foreach (@dbh) { $_->do("rollback prepared 'cornea_node'"); } };
144     die $real_error;
145   }
146   foreach (@dbh) { $_->disconnect; }
147   die $@ unless($storage_node_id);
148   return $storage_node_id;
149 }
150 sub updateNode {
151   my $self = shift;
152   my $ip = shift;
153   my $attr = shift;
154   my $config = Cornea::Config->new();
155   die "bad state"
156     unless $attr->{state} =~ /^(?:open|closed|offline|decommissioned)$/;
157   die "storage must be a number"
158     unless $attr->{total_storage} =~ /^[1-9]\d*$/ and
159            $attr->{used_storage} =~ /^[1-9]\d*$/;
160   die "locaion must be dc/cage/row/rack/pdu"
161     unless !defined($attr->{location}) or
162            $attr->{location} =~ /^[^\/]+(?:\/[^\/]+){4}$/;
163   die "fqdn must not be blank"
164     unless !defined($attr->{fqdn}) or length($attr->{fqdn});
165
166   if(defined($attr->{location}) || defined($attr->{fqdn})) {
167     return return _2pc_add_storage($ip, $attr);
168   }
169   my $dsns = $config->get_list("DB::dsn");
170   foreach (@$dsns) {
171     my $dbh = DBI->connect($_,
172                  $config->get("DB::user"),
173                  $config->get("DB::pass"),
174                  { PrintError => 0, RaiseError => 1, AutoCommit => 1 }
175                 );
176     my $tried = 0;
177    again:
178     eval {
179       my $sth = $self->{dbh}->prepare("select set_storage_node(?,?,?,?,?,?,?)");
180       $sth->execute($attr->{state},
181                     $attr->{total_storage}, $attr->{used_storage},
182                     $attr->{location}, $attr->{fqdn}, $ip, undef);
183       $sth->finish();
184     };
185     if ($@) {
186       unless ($tried++) { $self->__reconnect();  goto again; }
187       print STDERR $@;
188     }
189   }
190   return 0;
191 }
192
193 sub repInfo {
194   my $self = shift;
195   my ($serviceId, $repId) = @_;
196   my $tried = 0;
197   my $row;
198  again:
199   eval {
200     my $sth = $self->{dbh}->prepare("select * from get_representation(?,?)");
201     $sth->execute($serviceId, $repId);
202     $row = $sth->fetchrow_hashref();
203     $sth->finish();
204   };
205   if ($@) {
206     unless ($tried++) { $self->__reconnect();  goto again; }
207     die $@ if $@;
208   }
209   return Cornea::RepresentationInfo->new_from_row($row);
210 }
211
212 sub repInfoDependents {
213   my $self = shift;
214   my ($serviceId, $repId) = @_;
215   my $tried = 0;
216   my @deps;
217  again:
218   eval {
219     @deps = ();
220     my $sth = $self->{dbh}->prepare("select * from get_representation_dependents(?,?)");
221     $sth->execute($serviceId, $repId);
222     while(my $row = $sth->fetchrow_hashref()) {
223       push @deps, Cornea::RepresentationInfo->new_from_row($row);
224     }
225     $sth->finish();
226   };
227   if ($@) {
228     unless ($tried++) { $self->__reconnect();  goto again; }
229     die $@ if $@;
230   }
231   return @deps;
232 }
233
234 sub initAssetTable {
235   my $self = shift;
236   my $config = Cornea::Config->new();
237   my $host = $config->get('sysinfo::nodename');
238   (my $tbl = $host) =~ s/\-/_/g;
239   $tbl =~ s/\..*//;
240   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
241                          $config->get("DB::user"),
242                          $config->get("DB::pass"),
243                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
244                         );
245   $dbh->begin_work();
246   eval {
247     $dbh->do("CREATE TABLE cornea.asset_$tbl
248                     (CONSTRAINT asset_${tbl}_pkey
249                         PRIMARY KEY (service_id, asset_id, representation_id))
250                      INHERITS (cornea.asset)");
251     $dbh->do("CREATE OR REPLACE FUNCTION cornea.make_asset(in_service_id integer, in_asset_id bigint, in_repid integer, in_storage_location smallint[]) RETURNS void AS 'delete from asset where service_id = \$1 and asset_id = \$2 and representation_id = \$3; insert into asset_${tbl} (service_id, asset_id, representation_id, storage_location) VALUES (\$1, \$2, \$3, \$4);' LANGUAGE sql");
252     $dbh->commit();
253   };
254   if($@) {
255     my $err = $@;
256     eval { $dbh->rollback; };
257     return (-1, "already initialized") if $err =~ /already exists/;
258     return (-1, $err);
259   }
260   return 0;
261 }
262
263 sub setupAssetQueue {
264   my $self = shift;
265   my $config = Cornea::Config->new();
266   my $host = shift;
267   gethostbyname($host) || die "could not resolve $host\n";;
268   (my $tbl = $host) =~ s/\-/_/g;
269   $tbl =~ s/\..*//;
270   my $phost = $config->get('sysinfo::nodename');
271   (my $ptbl = $phost) =~ s/\-/_/g;
272   $ptbl =~ s/\..*//;
273   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
274                          $config->get("DB::user"),
275                          $config->get("DB::pass"),
276                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
277                         );
278   $dbh->begin_work();
279   eval {
280     $dbh->do("CREATE TABLE cornea.asset_$tbl
281                     (CONSTRAINT asset_${tbl}_uc
282                         PRIMARY KEY (service_id, asset_id, representation_id))
283                      INHERITS (cornea.asset)");
284     $dbh->do("CREATE TABLE cornea.asset_${tbl}_queue
285                      (LIKE cornea.asset
286                       EXCLUDING CONSTRAINTS
287                       EXCLUDING INDEXES)");
288     $dbh->do(<<SQL);
289 CREATE FUNCTION cornea.populate_asset_${tbl}_queue() RETURNS TRIGGER
290   AS '
291 DECLARE
292 BEGIN
293   INSERT INTO cornea.asset_${tbl}_queue
294               (asset_id, service_id, representation_id, storage_location)
295        VALUES (NEW.asset_id, NEW.service_id, NEW.representation_id,
296                NEW.storage_location);
297   RETURN NEW;
298 END
299 ' LANGUAGE plpgsql
300 SQL
301     $dbh->do("CREATE TRIGGER asset_${tbl}_queue_trigger
302                 AFTER INSERT OR UPDATE ON cornea.asset_${ptbl}
303                 FOR EACH ROW
304                 EXECUTE PROCEDURE cornea.populate_asset_${tbl}_queue()");
305     $dbh->commit();
306   };
307   if($@) {
308     my $err = $@;
309     eval { $dbh->rollback; };
310     return (-1, "init-metanode first")
311       if $err =~ /"cornea.asset_$ptbl" does not exist/;
312     return (-1, "already initialized") if $err =~ /already exists/;
313     die $err;
314   }
315   return 0;
316 }
317
318 sub destroyAssetQueue {
319   my $self = shift;
320   my $config = Cornea::Config->new();
321   my $host = shift;
322   gethostbyname($host) || die "could not resolve $host\n";;
323   (my $tbl = $host) =~ s/\-/_/g;
324   $tbl =~ s/\..*//;
325   my $phost = $config->get('sysinfo::nodename');
326   (my $ptbl = $phost) =~ s/\-/_/g;
327   $ptbl =~ s/\..*//;
328   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
329                          $config->get("DB::user"),
330                          $config->get("DB::pass"),
331                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
332                         );
333   $dbh->begin_work();
334   eval {
335     $dbh->do("DROP TRIGGER asset_${tbl}_queue_trigger ON cornea.asset_${ptbl}");
336     $dbh->do("DROP FUNCTION cornea.populate_asset_${tbl}_queue()");
337     $dbh->do("DROP TABLE cornea.asset_${tbl}_queue");
338     $dbh->do("DROP TABLE cornea.asset_$tbl");
339     $dbh->commit();
340   };
341   if($@) {
342     my $err = $@;
343     eval { $dbh->rollback; };
344     return (-1, "already perfomed") if $err =~ /does not exist/;
345     return (-1, $err);
346   }
347   return 0;
348 }
349
350 sub initialAssetSynch {
351   my $self = shift;
352   my $config = Cornea::Config->new();
353   my $host = shift;
354   gethostbyname($host) || die "could not resolve $host\n";;
355   (my $tbl = $host) =~ s/\-/_/g;
356   $tbl =~ s/\..*//;
357   my $phost = $config->get('sysinfo::nodename');
358   (my $ptbl = $phost) =~ s/\-/_/g;
359   $ptbl =~ s/\..*//;
360   my $total_rows = 0;
361   my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea",
362                          $config->get("DB::user"),
363                          $config->get("DB::pass"),
364                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
365                         );
366   my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
367                          $config->get("DB::user"),
368                          $config->get("DB::pass"),
369                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
370                         );
371   $dbh->begin_work();
372   $ldbh->begin_work();
373   my $problem;
374   eval {
375     $problem = "peer not locally initialized";
376     my $isth = $ldbh->prepare(
377         "INSERT INTO cornea.asset_${tbl}
378                     (asset_id, service_id,
379                      representation_id, storage_location)
380               VALUES (?,?,?,?::smallint[])");
381     $problem = "remote not initialized";
382     $dbh->do("DECLARE initpull CURSOR FOR
383                SELECT asset_id, service_id,
384                       representation_id, storage_location
385                  FROM cornea.asset_${tbl}");
386     $problem = "remote peer not initialized";
387     $dbh->do("TRUNCATE cornea.asset_${ptbl}_queue");
388     $problem = "error pulling remote data";
389     my $sth = $dbh->prepare("FETCH FORWARD 10000 FROM initpull");
390     my $internal_rows_moved;
391     do {
392       $internal_rows_moved = 0;
393       $sth->execute();
394       while(my @row = $sth->fetchrow()) {
395         $problem = "error insert local data";
396         $isth->execute(@row);
397         $internal_rows_moved++;
398         $problem = "error pulling remote data";
399       }
400       $total_rows += $internal_rows_moved;
401     } while($internal_rows_moved);
402     $dbh->do("CLOSE initpull");
403     $dbh->commit();
404     $ldbh->commit();
405   };
406   if($@) {
407     my $err = $@;
408     eval { $ldbh->rollback; };
409     eval { $dbh->rollback; };
410     return (-1, "$problem:\n$err");
411   }
412   return (0, "$total_rows copied");
413 }
414 sub pullAssetTable {
415   my $self = shift;
416   my $config = Cornea::Config->new();
417   my $host = shift;
418   my $timeout = shift || 1;
419   gethostbyname($host) || die "could not resolve $host\n";;
420   (my $tbl = $host) =~ s/\-/_/g;
421   $tbl =~ s/\..*//;
422   my $phost = $config->get('sysinfo::nodename');
423   (my $ptbl = $phost) =~ s/\-/_/g;
424   $ptbl =~ s/\..*//;
425   my $total_rows = 0;
426   my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea",
427                          $config->get("DB::user"),
428                          $config->get("DB::pass"),
429                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
430                         );
431   my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
432                          $config->get("DB::user"),
433                          $config->get("DB::pass"),
434                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
435                         );
436   while(1) {
437     $dbh->begin_work();
438     $ldbh->begin_work();
439     eval {
440       my $from = $dbh->prepare("DELETE FROM cornea.asset_${ptbl}_queue RETURNING *");
441       my $todel = $ldbh->prepare("DELETE FROM cornea.asset
442                  WHERE asset_id = ? and service_id = ? and representation_id = ?");
443       my $toins = $ldbh->prepare(
444           "INSERT INTO cornea.asset_${tbl}
445                       (asset_id, service_id,
446                        representation_id, storage_location)
447                 VALUES (?,?,?,?::smallint[])");
448       $from->execute();
449       while(my @row = $from->fetchrow()) {
450         $todel->execute(@row[0..2]);
451         $toins->execute(@row);
452         $total_rows++;
453       }
454       $ldbh->commit;
455       $dbh->commit;
456     };
457     if($@) {
458       my $err = $@;
459       eval { $ldbh->rollback; };
460       eval { $dbh->rollback; };
461       return (-1, "$total_rows compied.\n$err");
462     }
463     sleep($timeout);
464   }
465   return (0, "$total_rows copied");
466 }
467 1;
Note: See TracBrowser for help on using the browser.