root/trunk/perl/lib/Cornea/RecallTable.pm

Revision 56, 18.0 kB (checked in by jesus, 5 years ago)

update a cache with the asset locations

Line 
1 package Cornea::RecallTable;
2 use strict;
3 use Cornea::Config;
4 use Cornea::Utils;
5 use Memcached::libmemcached qw(:memcached_behavior :memcached_server_distribution
6                                memcached_create memcached_behavior_set
7                                memcached_server_add memcached_set);
8 use YAML;
9 use DBI;
10
11 sub __connect {
12   my $self = shift;
13   my $config = Cornea::Config->new();
14   my $dbh;
15   my $failed_err = undef;
16   my $dsns = $config->get_list("DB::dsn");
17   Cornea::Utils::shuffle($dsns);
18   foreach my $dsn (@$dsns) {
19     eval {
20       $dbh = DBI->connect($dsn,
21                           $config->get("DB::user"),
22                           $config->get("DB::pass"),
23                           { PrintError => 0, RaiseError => 1 },
24                          );
25     };
26     die "$dsn: $@\n" if $@;
27     last unless $@;
28   }
29   print STDERR "$failed_err\n" if $failed_err;
30   $self->{dbh} = $dbh;
31 }
32 sub __reconnect {
33   my $self = shift;
34   $self->{dbh} = undef;
35   $self->__connect();
36 }
37 sub new {
38   my $class = shift;
39   my $self = bless { }, $class;
40   $self;
41 }
42
43 sub insert {
44   my $self = shift;
45   my ($serviceId, $assetId, $repId, $snl) = @_;
46   my $tried = 0;
47   die 'bad parameters' unless UNIVERSAL::isa($snl, 'Cornea::StorageNodeList');
48   my $snl_arr = '{' . join(',', map { $_->id() }
49                                     ($snl->items())) . '}';
50  again:
51   eval {
52     my $sth = $self->{dbh}->prepare("select make_asset(?,?,?,?::smallint[])");
53     $sth->execute($serviceId, $assetId, $repId, $snl_arr);
54     $sth->finish();
55   };
56   if ($@) {
57     unless ($tried++) { $self->__reconnect();  goto again; }
58     die $@ if $@;
59   }
60   $self->asset_cache_replace(@_);
61   return 1;
62 }
63
64 sub find {
65   my $self = shift;
66   my ($serviceId, $assetId, $repId) = @_;
67   my $sth = $self->{dbh}->prepare("select * from get_asset_location(?,?,?)");
68   my $tried = 0;
69   my $C;
70  again:
71   eval {
72     $C = Cornea::StorageNodeList->new();
73     $sth->execute($serviceId, $assetId, $repId);
74     while(my $node = $sth->fetchrow_hashref()) {
75       $C->add(Cornea::StorageNode->new_from_row($node));
76     }
77     $sth->finish();
78   };
79   if ($@) {
80     unless ($tried++) { $self->__reconnect();  goto again; }
81     die $@ if $@;
82   }
83   return $C;
84 }
85
86 sub getNodes {
87   my $self = shift;
88   my $type = shift;
89   my $tried = 0;
90   my $snl;
91   my $bind = undef;
92   if (defined($type)) {
93     $bind = (ref $type eq 'ARRAY') ? ('{'.join(',', @$type).'}') : "{$type}";
94   }
95  again:
96   eval {
97     $snl = Cornea::StorageNodeList->new();
98     my $sth = $self->{dbh}->prepare("select * from get_storage_nodes(?::storagestate[])");
99     $sth->execute($bind);
100     while(my $row = $sth->fetchrow_hashref()) {
101       $snl->add(Cornea::StorageNode->new_from_row($row));
102     }
103     $sth->finish();
104   };
105   if ($@) {
106     unless ($tried++) { $self->__reconnect();  goto again; }
107     die $@ if $@;
108   }
109   return $snl;
110 }
111
112 sub _2pc_generic {
113   my $self = shift;
114   my $code = shift;
115   my $config = Cornea::Config->new();
116   my $dsns = $config->get_list("DB::dsn");
117   my $rv = undef;
118   my $named_txn = "cornea_$$";
119   my @dbh = map {
120     my $dbh = DBI->connect($_,
121                  $config->get("DB::user"),
122                  $config->get("DB::pass"),
123                  { PrintError => 0, RaiseError => 1, AutoCommit => 1 }
124                 );
125     $dbh->begin_work();
126     [$_, $dbh];
127   } @$dsns;
128   eval {
129     foreach (@dbh) {
130       &$code($_->[0], $_->[1], \$rv);
131     }
132     foreach (@dbh) { $_->[1]->do("prepare transaction '$named_txn'"); }
133     foreach (@dbh) { $_->[1]->do("commit prepared '$named_txn'"); }
134   };
135   if ($@) {
136     my $real_error = $@;
137     $rv = undef;
138     eval { foreach (@dbh) { $_->[1]->do("rollback prepared '$named_txn'"); } };
139     die $real_error;
140   }
141   foreach (@dbh) { $_->[1]->disconnect; }
142   return $rv;
143 }
144      
145 sub updateNode {
146   my $self = shift;
147   my $ip = shift;
148   my $attr = shift;
149   my $config = Cornea::Config->new();
150   die "bad state"
151     unless $attr->{state} =~ /^(?:open|closed|offline|decommissioned)$/;
152   die "storage must be a number"
153     unless $attr->{total_storage} =~ /^[1-9]\d*$/ and
154            $attr->{used_storage} =~ /^[1-9]\d*$/;
155   die "locaion must be dc/cage/row/rack/pdu"
156     unless !defined($attr->{location}) or
157            $attr->{location} =~ /^[^\/]+(?:\/[^\/]+){4}$/;
158   die "fqdn must not be blank"
159     unless !defined($attr->{fqdn}) or length($attr->{fqdn});
160
161   if(defined($attr->{location}) || defined($attr->{fqdn})) {
162     return $self->_2pc_generic(sub {
163       eval {
164         my $dsn = shift;
165         my $dbh = shift;
166         my $rv = shift;
167         my $sth = $dbh->prepare("select set_storage_node(?,?,?,?,?,?,?)");
168         my $storage_node_id = $$rv;
169         $sth->execute($attr->{state},
170                       $attr->{total_storage}, $attr->{used_storage},
171                       $attr->{location}, $attr->{fqdn}, $ip, $storage_node_id);
172         my ($returned_storade_node_id) = $sth->fetchrow();
173         $$rv ||= $returned_storade_node_id;
174         $sth->finish();
175         die "Storage node trickery! (this should never happen).\n"
176           if($$rv != $returned_storade_node_id);
177       };
178       if ($@) {
179         die "location and fqdn must be specified for first-time update\n"
180           if $@ =~ /null value in column "(?:location|fqdn)"/;
181         die $@ if $@;
182       }
183     });
184   }
185   my $dsns = $config->get_list("DB::dsn");
186   foreach (@$dsns) {
187     my $dbh = DBI->connect($_,
188                  $config->get("DB::user"),
189                  $config->get("DB::pass"),
190                  { PrintError => 0, RaiseError => 1, AutoCommit => 1 }
191                 );
192     my $tried = 0;
193    again:
194     eval {
195       print STDERR "$_ -> updating node info\n" if $main::DEBUG;
196       my $sth = $dbh->prepare("select set_storage_node(?,?,?,?,?,?,?)");
197       $sth->execute($attr->{state},
198                     $attr->{total_storage}, $attr->{used_storage},
199                     $attr->{location}, $attr->{fqdn}, $ip, undef);
200       $sth->finish();
201     };
202     if ($@) {
203       my $err = $@;
204       $err = "failed: no match by IP\n"
205         if $@ =~ /null value in column "(?:location|fqdn)"/;
206       print STDERR "\t$@\n" if $main::DEBUG;
207       unless ($tried++) { $self->__reconnect();  goto again; }
208       print STDERR "$_: $err";
209     }
210   }
211   return 0;
212 }
213
214 sub updateRepInfo {
215   my $self = shift;
216   my $service_id = shift;
217   my $rep_id = shift;
218   my $attr = shift;
219   eval "use $attr->{class};";
220   return (-1, "Cannot load $attr->{class}") if $@;
221   return $self->_2pc_generic(sub {
222     my $dsn = shift;
223     my $dbh = shift;
224     my $rv = shift;
225     my $sth = $dbh->prepare("select make_representation(?,?,?,?,?,?,?)");
226     $sth->execute($service_id, $rep_id, $attr->{name},
227                   $attr->{distance}, $attr->{count},
228                   $attr->{parent}, $attr->{class});
229     $sth->finish();
230   });
231 }
232
233 sub repInfo {
234   my $self = shift;
235   my ($serviceId, $repId) = @_;
236   my $tried = 0;
237   my $row;
238  again:
239   eval {
240     my $sth = $self->{dbh}->prepare("select * from get_representation(?,?)");
241     $sth->execute($serviceId, $repId);
242     $row = $sth->fetchrow_hashref();
243     $sth->finish();
244   };
245   if ($@) {
246     unless ($tried++) { $self->__reconnect();  goto again; }
247     die $@ if $@;
248   }
249   return $row ? Cornea::RepresentationInfo->new_from_row($row) : undef;
250 }
251
252 sub repInfoDependents {
253   my $self = shift;
254   my ($serviceId, $repId) = @_;
255   my $tried = 0;
256   my @deps;
257  again:
258   eval {
259     @deps = ();
260     my $sth = $self->{dbh}->prepare("select * from get_representation_dependents(?,?)");
261     $sth->execute($serviceId, $repId);
262     while(my $row = $sth->fetchrow_hashref()) {
263       push @deps, Cornea::RepresentationInfo->new_from_row($row);
264     }
265     $sth->finish();
266   };
267   if ($@) {
268     unless ($tried++) { $self->__reconnect();  goto again; }
269     die $@ if $@;
270   }
271   return @deps;
272 }
273
274 sub initAssetTable {
275   my $self = shift;
276   my $config = Cornea::Config->new();
277   my $host = $config->get('sysinfo::nodename');
278   (my $tbl = $host) =~ s/\-/_/g;
279   $tbl =~ s/\..*//;
280   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
281                          $config->get("DB::user"),
282                          $config->get("DB::pass"),
283                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
284                         );
285   $dbh->do("set client_min_messages = 'WARNING'");
286   $dbh->begin_work();
287   eval {
288     $dbh->do("CREATE TABLE cornea.asset_$tbl
289                     (CONSTRAINT asset_${tbl}_pkey
290                         PRIMARY KEY (service_id, asset_id, representation_id))
291                      INHERITS (cornea.asset)");
292     $dbh->do("CREATE OR REPLACE FUNCTION cornea.make_asset(in_service_id integer, in_asset_id bigint, in_repid integer, in_storage_location smallint[]) RETURNS void AS 'delete from asset where service_id = \$1 and asset_id = \$2 and representation_id = \$3; insert into asset_${tbl} (service_id, asset_id, representation_id, storage_location) VALUES (\$1, \$2, \$3, \$4);' LANGUAGE sql");
293     $dbh->commit();
294   };
295   if($@) {
296     my $err = $@;
297     eval { $dbh->rollback; };
298     return (-1, "already initialized") if $err =~ /already exists/;
299     return (-1, $err);
300   }
301   return 0;
302 }
303
304 sub setupAssetQueue {
305   my $self = shift;
306   my $config = Cornea::Config->new();
307   my $host = shift;
308   gethostbyname($host) || die "could not resolve $host\n";;
309   (my $tbl = $host) =~ s/\-/_/g;
310   $tbl =~ s/\..*//;
311   my $phost = $config->get('sysinfo::nodename');
312   (my $ptbl = $phost) =~ s/\-/_/g;
313   $ptbl =~ s/\..*//;
314   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
315                          $config->get("DB::user"),
316                          $config->get("DB::pass"),
317                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
318                         );
319   $dbh->do("set client_min_messages = 'WARNING'");
320   $dbh->begin_work();
321   eval {
322     $dbh->do("CREATE TABLE cornea.asset_$tbl
323                     (CONSTRAINT asset_${tbl}_uc
324                         PRIMARY KEY (service_id, asset_id, representation_id))
325                      INHERITS (cornea.asset)");
326     $dbh->do("CREATE TABLE cornea.asset_${tbl}_queue
327                      (LIKE cornea.asset
328                       EXCLUDING CONSTRAINTS
329                       EXCLUDING INDEXES)");
330     $dbh->do(<<SQL);
331 CREATE FUNCTION cornea.populate_asset_${tbl}_queue() RETURNS TRIGGER
332   AS '
333 DECLARE
334 BEGIN
335   INSERT INTO cornea.asset_${tbl}_queue
336               (asset_id, service_id, representation_id, storage_location)
337        VALUES (NEW.asset_id, NEW.service_id, NEW.representation_id,
338                NEW.storage_location);
339   RETURN NEW;
340 END
341 ' LANGUAGE plpgsql
342 SQL
343     $dbh->do("CREATE TRIGGER asset_${tbl}_queue_trigger
344                 AFTER INSERT OR UPDATE ON cornea.asset_${ptbl}
345                 FOR EACH ROW
346                 EXECUTE PROCEDURE cornea.populate_asset_${tbl}_queue()");
347     $dbh->commit();
348   };
349   if($@) {
350     my $err = $@;
351     eval { $dbh->rollback; };
352     return (-1, "init-metanode first")
353       if $err =~ /"cornea.asset_$ptbl" does not exist/;
354     return (-1, "already initialized") if $err =~ /already exists/;
355     die $err;
356   }
357   return 0;
358 }
359
360 sub destroyAssetQueue {
361   my $self = shift;
362   my $config = Cornea::Config->new();
363   my $host = shift;
364   gethostbyname($host) || die "could not resolve $host\n";;
365   (my $tbl = $host) =~ s/\-/_/g;
366   $tbl =~ s/\..*//;
367   my $phost = $config->get('sysinfo::nodename');
368   (my $ptbl = $phost) =~ s/\-/_/g;
369   $ptbl =~ s/\..*//;
370   my $dbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
371                          $config->get("DB::user"),
372                          $config->get("DB::pass"),
373                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
374                         );
375   $dbh->begin_work();
376   eval {
377     $dbh->do("DROP TRIGGER asset_${tbl}_queue_trigger ON cornea.asset_${ptbl}");
378     $dbh->do("DROP FUNCTION cornea.populate_asset_${tbl}_queue()");
379     $dbh->do("DROP TABLE cornea.asset_${tbl}_queue");
380     $dbh->do("DROP TABLE cornea.asset_$tbl");
381     $dbh->commit();
382   };
383   if($@) {
384     my $err = $@;
385     eval { $dbh->rollback; };
386     return (-1, "already perfomed") if $err =~ /does not exist/;
387     return (-1, $err);
388   }
389   return 0;
390 }
391
392 sub initialAssetSynch {
393   my $self = shift;
394   my $config = Cornea::Config->new();
395   my $host = shift;
396   gethostbyname($host) || die "could not resolve $host\n";;
397   (my $tbl = $host) =~ s/\-/_/g;
398   $tbl =~ s/\..*//;
399   my $phost = $config->get('sysinfo::nodename');
400   (my $ptbl = $phost) =~ s/\-/_/g;
401   $ptbl =~ s/\..*//;
402   my $total_rows = 0;
403   my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea",
404                          $config->get("DB::user"),
405                          $config->get("DB::pass"),
406                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
407                         );
408   my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
409                          $config->get("DB::user"),
410                          $config->get("DB::pass"),
411                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
412                         );
413   $dbh->begin_work();
414   $ldbh->begin_work();
415   my $problem;
416   eval {
417     $problem = "peer not locally initialized";
418     my $isth = $ldbh->prepare(
419         "INSERT INTO cornea.asset_${tbl}
420                     (asset_id, service_id,
421                      representation_id, storage_location)
422               VALUES (?,?,?,?::smallint[])");
423     $problem = "remote not initialized";
424     $dbh->do("DECLARE initpull CURSOR FOR
425                SELECT asset_id, service_id,
426                       representation_id, storage_location
427                  FROM cornea.asset_${tbl}");
428     $problem = "remote peer not initialized";
429     $dbh->do("TRUNCATE cornea.asset_${ptbl}_queue");
430     $problem = "error pulling remote data";
431     my $sth = $dbh->prepare("FETCH FORWARD 10000 FROM initpull");
432     my $internal_rows_moved;
433     do {
434       $internal_rows_moved = 0;
435       $sth->execute();
436       while(my @row = $sth->fetchrow()) {
437         $problem = "error insert local data";
438         $isth->execute(@row);
439         $internal_rows_moved++;
440         $problem = "error pulling remote data";
441       }
442       $total_rows += $internal_rows_moved;
443     } while($internal_rows_moved);
444     $dbh->do("CLOSE initpull");
445     $dbh->commit();
446     $ldbh->commit();
447   };
448   if($@) {
449     my $err = $@;
450     eval { $ldbh->rollback; };
451     eval { $dbh->rollback; };
452     return (-1, "$problem:\n$err");
453   }
454   return (0, "$total_rows copied");
455 }
456 sub pullAssetTable {
457   my $self = shift;
458   my $config = Cornea::Config->new();
459   my $host = shift;
460   my $timeout = shift || 1;
461   gethostbyname($host) || die "could not resolve $host\n";;
462   (my $tbl = $host) =~ s/\-/_/g;
463   $tbl =~ s/\..*//;
464   my $phost = $config->get('sysinfo::nodename');
465   (my $ptbl = $phost) =~ s/\-/_/g;
466   $ptbl =~ s/\..*//;
467   my $total_rows = 0;
468   my $dbh = DBI->connect("dbi:Pg:host=$host;dbname=cornea",
469                          $config->get("DB::user"),
470                          $config->get("DB::pass"),
471                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
472                         );
473   my $ldbh = DBI->connect("dbi:Pg:host=localhost;dbname=cornea",
474                          $config->get("DB::user"),
475                          $config->get("DB::pass"),
476                          { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
477                         );
478   while(1) {
479     $dbh->begin_work();
480     $ldbh->begin_work();
481     eval {
482       my $from = $dbh->prepare("DELETE FROM cornea.asset_${ptbl}_queue RETURNING *");
483       my $todel = $ldbh->prepare("DELETE FROM cornea.asset
484                  WHERE asset_id = ? and service_id = ? and representation_id = ?");
485       my $toins = $ldbh->prepare(
486           "INSERT INTO cornea.asset_${tbl}
487                       (asset_id, service_id,
488                        representation_id, storage_location)
489                 VALUES (?,?,?,?::smallint[])");
490       $from->execute();
491       while(my @row = $from->fetchrow()) {
492         $todel->execute(@row[0..2]);
493         $toins->execute(@row);
494         $total_rows++;
495       }
496       $ldbh->commit;
497       $dbh->commit;
498     };
499     if($@) {
500       my $err = $@;
501       eval { $ldbh->rollback; };
502       eval { $dbh->rollback; };
503       return (-1, "$total_rows compied.\n$err");
504     }
505     sleep($timeout);
506   }
507   return (0, "$total_rows copied");
508 }
509
510 sub listAssetTables {
511   my $self = shift;
512   $self->_2pc_generic(sub {
513     my $dsn = shift;
514     my $dbh = shift;
515     my $rv = shift;
516     $$rv ||= {};
517     my $sth = $dbh->prepare(<<SQL);
518
519     select relname, (case when pg_trigger.oid is null
520                           then 'remote'
521                           else 'master' end) as partition_type
522       from pg_class
523       join (select inhrelid
524               from pg_inherits
525              where inhparent in (select oid
526                                    from pg_class
527                                   where relname='asset'
528                                     and relnamespace in (select oid
529                                                            from pg_namespace
530                                                           where nspname = 'cornea'))) as c
531         on (pg_class.oid = inhrelid)
532  left join pg_trigger
533         on (c.inhrelid = pg_trigger.tgrelid)
534
535 SQL
536     $sth->execute();
537     my $master;
538     my $slaves = {};
539     while(my @row = $sth->fetchrow()) {
540       if ($row[1] eq 'master') {
541         $master = $row[0];
542       }
543       else {
544         my $qsize = $dbh->prepare("select count(*)
545                                     from cornea.$row[0]_queue");
546         $qsize->execute();
547         my ($qsize_result) = $qsize->fetchrow();
548         $qsize->finish;
549         $slaves->{$row[0]} = "$qsize_result rows behind";
550       }
551     }
552     ${$rv}->{$dsn} = { master => $master, slaves => $slaves };
553   });
554 }
555
556 my $_g_mc;
557 my $_g_mc_id = '';
558
559 sub asset_cache_replace {
560   my $self = shift;
561   my ($serviceId, $assetId, $repId, $snl) = @_;
562   my $all = $self->getNodes(['open','closed','truant']);
563   my @nodes = sort (map { $_->ip() } ($all->items()));
564   my $id = join(',', @nodes);
565   if (!$_g_mc or $_g_mc_id ne $id) {
566     $_g_mc_id = $id;
567     $_g_mc = memcached_create();
568     memcached_behavior_set($_g_mc, MEMCACHED_BEHAVIOR_DISTRIBUTION(),
569                            MEMCACHED_DISTRIBUTION_CONSISTENT());
570     foreach my $ip (@nodes) {
571       memcached_server_add($_g_mc, $ip, 11211);
572     }
573   }
574   memcached_set($_g_mc, "$serviceId-$assetId-$repId",
575                 join(',', map { $_->id() } ($snl->items())));
576 }
577 1;
Note: See TracBrowser for help on using the browser.