root/trunk/perl/lib/Cornea/RecallTable.pm

Revision 25, 14.6 kB (checked in by jesus, 5 years ago)

make this quiet for the creates... no notices.

Line 
1 package Cornea::RecallTable;
2 use strict;
3 use Cornea::Config;
4 use Cornea::Utils;
5 use DBI;
6
7 sub __connect {
8   my $self = shift;
9   my $config = Cornea::Config->new();
10   my $dbh;
11   my $failed_err = undef;
12   my $dsns = $config->get_list("DB::dsn");
13   Cornea::Utils::shuffle($dsns);
14   foreach my $dsn (@$dsns) {
15     eval {
16       $dbh = DBI->connect($dsn,
17                           $config->get("DB::user"),
18                           $config->get("DB::pass"),
19                           { PrintError => 0, RaiseError => 1 },
20                          );
21     };
22     die "$dsn: $@\n" if $@;
23     last unless $@;
24   }
25   print STDERR "$failed_err\n" if $failed_err;
26   $self->{dbh} = $dbh;
27 }
28 sub __reconnect {
29   my $self = shift;
30   $self->{dbh} = undef;
31   $self->__connect();
32 }
33 sub new {
34   my $class = shift;
35   my $self = bless { }, $class;
36   $self;
37 }
38
39 sub insert {
40   my $self = shift;
41   my ($serviceId, $assetId, $repId, $snl) = @_;
42   my $tried = 0;
43   die 'bad parameters' unless UNIVERSAL::ISA($snl, 'Cornea::StorageNodeList');
44   my $snl_arr = '{' . join(',', map { $_->storeagenodeid() }
45                                     ($snl->items())) . '}';
46  again:
47   eval {
48     my $sth = $self->{dbh}->prepare("select make_asset(?,?,?,?::int[])");
49     $sth->execute($serviceId, $assetId, $repId, $snl_arr);
50     $sth->finish();
51   };
52   if ($@) {
53     unless ($tried++) { $self->__reconnect();  goto again; }
54     die $@ if $@;
55   }
56   return 1;
57 }
58
59 sub find {
60   my $self = shift;
61   my ($serviceId, $assetId, $repId) = @_;
62   my $sth = $self->{dbh}->prepare("select get_asset_location(?,?,?)");
63   my $tried = 0;
64   my $C;
65  again:
66   eval {
67     $C = Cornea::StorageNodeList->new();
68     $sth->execute($serviceId, $assetId, $repId);
69     while(my $node = $sth->fetchrow_hashref()) {
70       $C->add(Cornea::StorageNode->new_from_row($node));
71     }
72     $sth->finish();
73   };
74   if ($@) {
75     unless ($tried++) { $self->__reconnect();  goto again; }
76     die $@ if $@;
77   }
78   return $C;
79 }
80
81 sub getNodes {
82   my $self = shift;
83   my $type = shift;
84   my $tried = 0;
85   my $snl;
86  again:
87   eval {
88     $snl = Cornea::StorageNodeList->new();
89     my $sth = $self->{dbh}->prepare("select * from get_storage_nodes_by_state(?)");
90     $sth->execute($type);
91     while(my $row = $sth->fetchrow_hashref()) {
92       $snl->add(Cornea::StorageNode->new_from_row($row));
93     }
94     $sth->finish();
95   };
96   if ($@) {
97     unless ($tried++) { $self->__reconnect();  goto again; }
98     die $@ if $@;
99   }
100   return $snl;
101 }
102
103 sub _2pc_add_storage {
104   my $ip = shift;
105   my $attr = shift;
106   my $storage_node_id;
107   my $config = Cornea::Config->new();
108   my $dsns = $config->get_list("DB::dsn");
109   my @dbh = map {
110     my $dbh = DBI->connect($_,
111                  $config->get("DB::user"),
112                  $config->get("DB::pass"),
113                  { PrintError => 0, RaiseError => 1, AutoCommit => 1 }
114                 );
115     $dbh->begin_work();
116     $dbh;
117   } @$dsns;
118   eval {
119     foreach (@dbh) {
120       eval {
121         my $sth = $_->prepare("select set_storage_node(?,?,?,?,?,?,?)");
122         $sth->execute($attr->{state},
123                       $attr->{total_storage}, $attr->{used_storage},
124                       $attr->{location}, $attr->{fqdn}, $ip, $storage_node_id);
125         my ($returned_storade_node_id) = $sth->fetchrow();
126         $storage_node_id ||= $returned_storade_node_id;
127         $sth->finish();
128         die "Storage node trickery! (this should never happen).\n"
129           if($storage_node_id != $returned_storade_node_id);
130       };
131       if ($@) {
132         die "location must be specified for first-time update\n"
133           if $@ =~ /null value in column "location"/;
134         die $@ if $@;
135       }
136     }
137     foreach (@dbh) { $_->do("prepare transaction 'cornea_node'"); }
138     foreach (@dbh) { $_->do("commit prepared 'cornea_node'"); }
139   };
140   if ($@) {
141     my $real_error = $@;
142     $storage_node_id = undef;
143     eval { foreach (@dbh) { $_->do("rollback prepared 'cornea_node'"); } };
144     die $real_error;
145   }
146   foreach (@dbh) { $_->disconnect; }
147   die $@ unless($storage_node_id);
148   return $storage_node_id;
149 }
150 sub updateNode {
151   my $self = shift;
152   my $ip = shift;
153   my $attr = shift;
154   my $config = Cornea::Config->new();
155   die "bad state"
156     unless $attr->{state} =~ /^(?:open|closed|offline|decommissioned)$/;
157   die "storage must be a number"
158     unless $attr->{total_storage} =~ /^[1-9]\d*$/ and
159            $attr->{used_storage} =~ /^[1-9]\d*$/;
160   die "locaion must be dc/cage/row/rack/pdu"
161     unless !defined($attr->{location}) or
162            $attr->{location} =~ /^[^\/]+(?:\/[^\/]+){4}$/;
163   die "fqdn must not be blank"
164     unless !defined($attr->{fqdn}) or length($attr->{fqdn});
165
166   if(defined($attr->{location}) || defined($attr->{fqdn})) {
167     return return _2pc_add_storage($ip, $attr);
168   }
169   my $dsns = $config->get_list("DB::dsn");
170   foreach (@$dsns) {
171     my $dbh = DBI->connect($_,
172                  $config->get("DB::user"),
173                  $config->get("DB::pass"),
174                  { PrintError => 0, RaiseError => 1, AutoCommit => 1 }
175                 );
176     my $tried = 0;
177    again:
178     eval {
179       my $sth = $self->{dbh}->prepare("select set_storage_node(?,?,?,?,?,?,?)");
180       $sth->execute($attr->{state},
181                     $attr->{total_storage}, $attr->{used_storage},
182                     $attr->{location}, $attr->{fqdn}, $ip, undef);
183       $sth->finish();
184     };
185     if ($@) {
186       unless ($tried++) { $self->__reconnect();  goto again; }
187       print STDERR $@;
188     }
189   }
190   return 0;
191 }
192
193 sub repInfo {
194   my $self = shift;
195   my ($serviceId, $repId) = @_;
196   my $tried = 0;
197   my $row;
198  again:
199   eval {
200     my $sth = $self->{dbh}->prepare("select * from get_representation(?,?)");
201     $sth->execute($serviceId, $repId);
202     $row = $sth->fetchrow_hashref();
203     $sth->finish();
204   };
205   if ($@) {
206     unless ($tried++) { $self->__reconnect();  goto again; }
207     die $@ if $@;
208   }
209   return Cornea::RepresentationInfo->new_from_row($row);
210 }
211
212 sub repInfoDependents {
213   my $self = shift;
214   my ($serviceId, $repId) = @_;
215   my $tried = 0;
216   my @deps;
217  again:
218   eval {
219     @deps = ();
220     my $sth = $self->{dbh}->prepare("select * from get_representation_dependents(?,?)");
221     $sth->execute($serviceId, $repId);
222     while(my $row = $sth->fetchrow_hashref()) {
223       push @deps, Cornea::RepresentationInfo->new_from_row($row);
224     }
225     $sth->finish();
226   };
227   if ($@) {
228     unless ($tried++) { $self->__reconnect();  goto again; }
229     die $@ if $@;
230   }
231   return @deps;
232 }
233
234 sub initAssetTable {
235   my $self = shift;
236   my $config = Cornea::Config->new();
237   my $host = $config->get('sysinfo::nodename');
238   (my $tbl = $host) =~ s/\-/_/g;
239   $tbl =~ s/\..*//;
240   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
241                          $config->get("DB::user"),
242                          $config->get("DB::pass"),
243                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
244                         );
245   $dbh->do("set client_min_messages = 'WARNING'");
246   $dbh->begin_work();
247   eval {
248     $dbh->do("CREATE TABLE cornea.asset_$tbl
249                     (CONSTRAINT asset_${tbl}_pkey
250                         PRIMARY KEY (service_id, asset_id, representation_id))
251                      INHERITS (cornea.asset)");
252     $dbh->do("CREATE OR REPLACE FUNCTION cornea.make_asset(in_service_id integer, in_asset_id bigint, in_repid integer, in_storage_location smallint[]) RETURNS void AS 'delete from asset where service_id = \$1 and asset_id = \$2 and representation_id = \$3; insert into asset_${tbl} (service_id, asset_id, representation_id, storage_location) VALUES (\$1, \$2, \$3, \$4);' LANGUAGE sql");
253     $dbh->commit();
254   };
255   if($@) {
256     my $err = $@;
257     eval { $dbh->rollback; };
258     return (-1, "already initialized") if $err =~ /already exists/;
259     return (-1, $err);
260   }
261   return 0;
262 }
263
264 sub setupAssetQueue {
265   my $self = shift;
266   my $config = Cornea::Config->new();
267   my $host = shift;
268   gethostbyname($host) || die "could not resolve $host\n";;
269   (my $tbl = $host) =~ s/\-/_/g;
270   $tbl =~ s/\..*//;
271   my $phost = $config->get('sysinfo::nodename');
272   (my $ptbl = $phost) =~ s/\-/_/g;
273   $ptbl =~ s/\..*//;
274   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
275                          $config->get("DB::user"),
276                          $config->get("DB::pass"),
277                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
278                         );
279   $dbh->do("set client_min_messages = 'WARNING'");
280   $dbh->begin_work();
281   eval {
282     $dbh->do("CREATE TABLE cornea.asset_$tbl
283                     (CONSTRAINT asset_${tbl}_uc
284                         PRIMARY KEY (service_id, asset_id, representation_id))
285                      INHERITS (cornea.asset)");
286     $dbh->do("CREATE TABLE cornea.asset_${tbl}_queue
287                      (LIKE cornea.asset
288                       EXCLUDING CONSTRAINTS
289                       EXCLUDING INDEXES)");
290     $dbh->do(<<SQL);
291 CREATE FUNCTION cornea.populate_asset_${tbl}_queue() RETURNS TRIGGER
292   AS '
293 DECLARE
294 BEGIN
295   INSERT INTO cornea.asset_${tbl}_queue
296               (asset_id, service_id, representation_id, storage_location)
297        VALUES (NEW.asset_id, NEW.service_id, NEW.representation_id,
298                NEW.storage_location);
299   RETURN NEW;
300 END
301 ' LANGUAGE plpgsql
302 SQL
303     $dbh->do("CREATE TRIGGER asset_${tbl}_queue_trigger
304                 AFTER INSERT OR UPDATE ON cornea.asset_${ptbl}
305                 FOR EACH ROW
306                 EXECUTE PROCEDURE cornea.populate_asset_${tbl}_queue()");
307     $dbh->commit();
308   };
309   if($@) {
310     my $err = $@;
311     eval { $dbh->rollback; };
312     return (-1, "init-metanode first")
313       if $err =~ /"cornea.asset_$ptbl" does not exist/;
314     return (-1, "already initialized") if $err =~ /already exists/;
315     die $err;
316   }
317   return 0;
318 }
319
320 sub destroyAssetQueue {
321   my $self = shift;
322   my $config = Cornea::Config->new();
323   my $host = shift;
324   gethostbyname($host) || die "could not resolve $host\n";;
325   (my $tbl = $host) =~ s/\-/_/g;
326   $tbl =~ s/\..*//;
327   my $phost = $config->get('sysinfo::nodename');
328   (my $ptbl = $phost) =~ s/\-/_/g;
329   $ptbl =~ s/\..*//;
330   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
331                          $config->get("DB::user"),
332                          $config->get("DB::pass"),
333                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
334                         );
335   $dbh->begin_work();
336   eval {
337     $dbh->do("DROP TRIGGER asset_${tbl}_queue_trigger ON cornea.asset_${ptbl}");
338     $dbh->do("DROP FUNCTION cornea.populate_asset_${tbl}_queue()");
339     $dbh->do("DROP TABLE cornea.asset_${tbl}_queue");
340     $dbh->do("DROP TABLE cornea.asset_$tbl");
341     $dbh->commit();
342   };
343   if($@) {
344     my $err = $@;
345     eval { $dbh->rollback; };
346     return (-1, "already perfomed") if $err =~ /does not exist/;
347     return (-1, $err);
348   }
349   return 0;
350 }
351
352 sub initialAssetSynch {
353   my $self = shift;
354   my $config = Cornea::Config->new();
355   my $host = shift;
356   gethostbyname($host) || die "could not resolve $host\n";;
357   (my $tbl = $host) =~ s/\-/_/g;
358   $tbl =~ s/\..*//;
359   my $phost = $config->get('sysinfo::nodename');
360   (my $ptbl = $phost) =~ s/\-/_/g;
361   $ptbl =~ s/\..*//;
362   my $total_rows = 0;
363   my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea",
364                          $config->get("DB::user"),
365                          $config->get("DB::pass"),
366                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
367                         );
368   my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
369                          $config->get("DB::user"),
370                          $config->get("DB::pass"),
371                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
372                         );
373   $dbh->begin_work();
374   $ldbh->begin_work();
375   my $problem;
376   eval {
377     $problem = "peer not locally initialized";
378     my $isth = $ldbh->prepare(
379         "INSERT INTO cornea.asset_${tbl}
380                     (asset_id, service_id,
381                      representation_id, storage_location)
382               VALUES (?,?,?,?::smallint[])");
383     $problem = "remote not initialized";
384     $dbh->do("DECLARE initpull CURSOR FOR
385                SELECT asset_id, service_id,
386                       representation_id, storage_location
387                  FROM cornea.asset_${tbl}");
388     $problem = "remote peer not initialized";
389     $dbh->do("TRUNCATE cornea.asset_${ptbl}_queue");
390     $problem = "error pulling remote data";
391     my $sth = $dbh->prepare("FETCH FORWARD 10000 FROM initpull");
392     my $internal_rows_moved;
393     do {
394       $internal_rows_moved = 0;
395       $sth->execute();
396       while(my @row = $sth->fetchrow()) {
397         $problem = "error insert local data";
398         $isth->execute(@row);
399         $internal_rows_moved++;
400         $problem = "error pulling remote data";
401       }
402       $total_rows += $internal_rows_moved;
403     } while($internal_rows_moved);
404     $dbh->do("CLOSE initpull");
405     $dbh->commit();
406     $ldbh->commit();
407   };
408   if($@) {
409     my $err = $@;
410     eval { $ldbh->rollback; };
411     eval { $dbh->rollback; };
412     return (-1, "$problem:\n$err");
413   }
414   return (0, "$total_rows copied");
415 }
416 sub pullAssetTable {
417   my $self = shift;
418   my $config = Cornea::Config->new();
419   my $host = shift;
420   my $timeout = shift || 1;
421   gethostbyname($host) || die "could not resolve $host\n";;
422   (my $tbl = $host) =~ s/\-/_/g;
423   $tbl =~ s/\..*//;
424   my $phost = $config->get('sysinfo::nodename');
425   (my $ptbl = $phost) =~ s/\-/_/g;
426   $ptbl =~ s/\..*//;
427   my $total_rows = 0;
428   my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea",
429                          $config->get("DB::user"),
430                          $config->get("DB::pass"),
431                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
432                         );
433   my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
434                          $config->get("DB::user"),
435                          $config->get("DB::pass"),
436                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
437                         );
438   while(1) {
439     $dbh->begin_work();
440     $ldbh->begin_work();
441     eval {
442       my $from = $dbh->prepare("DELETE FROM cornea.asset_${ptbl}_queue RETURNING *");
443       my $todel = $ldbh->prepare("DELETE FROM cornea.asset
444                  WHERE asset_id = ? and service_id = ? and representation_id = ?");
445       my $toins = $ldbh->prepare(
446           "INSERT INTO cornea.asset_${tbl}
447                       (asset_id, service_id,
448                        representation_id, storage_location)
449                 VALUES (?,?,?,?::smallint[])");
450       $from->execute();
451       while(my @row = $from->fetchrow()) {
452         $todel->execute(@row[0..2]);
453         $toins->execute(@row);
454         $total_rows++;
455       }
456       $ldbh->commit;
457       $dbh->commit;
458     };
459     if($@) {
460       my $err = $@;
461       eval { $ldbh->rollback; };
462       eval { $dbh->rollback; };
463       return (-1, "$total_rows compied.\n$err");
464     }
465     sleep($timeout);
466   }
467   return (0, "$total_rows copied");
468 }
469 1;
Note: See TracBrowser for help on using the browser.