Changeset 4
- Timestamp:
- 08/18/09 20:49:19 (4 years ago)
- Files:
-
- trunk/perl/lib/Cornea.pm (modified) (4 diffs)
- trunk/perl/lib/Cornea/ApacheStore.pm (added)
- trunk/perl/lib/Cornea/Config.pm (modified) (1 diff)
- trunk/perl/lib/Cornea/Queue.pm (modified) (1 diff)
- trunk/perl/lib/Cornea/RecallTable.pm (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/perl/lib/Cornea.pm
r3 r4 2 2 3 3 use strict; 4 use Switch;4 use Cornea::Config; 5 5 use Cornea::RepresentationInfo; 6 6 use Cornea::RecallTable; … … 8 8 use Cornea::StorageNode; 9 9 use Cornea::StorageNodeList; 10 11 our $MUST_BE_COMPLETE = 0;12 10 13 11 sub new { … … 61 59 62 60 if ($S->count < $repinfo->replicationCount) { 63 if($MUST_BE_COMPLETE) { 61 my $config = Cornea::Config->new(); 62 if($config->must_be_complete()) { 64 63 foreach my $n ($S->items) { $n->delete($serviceId, $assetId, $repId); } 65 64 die "Failed to reach required replication count"; … … 190 189 } 191 190 191 192 192 sub worker { 193 193 my $self = shift; 194 195 while(my ($op, $detail) = Cornea::Queue::dequeue()) { 196 switch($op) { 197 case "PROCESS" { $self->process($detail); } 198 case "REPLICATE" { $self->replicate($detail); } 199 else { $self->log("UNKNOWNE Queue op($op)\n"); } 200 } 201 } 194 my $queue = Cornea::Queue->new(); 195 196 $queue->worker( 197 sub { 198 my $op = shift; 199 my $detail = shift; 200 if ($op eq 'PROCESS') { $self->process($detail); } 201 elsif ($op eq 'REPLICATE') { $self->replicate($detail); } 202 else { $self->log("UNKNOWNE Queue op($op)\n"); } 203 } 204 ); 202 205 } 203 206 trunk/perl/lib/Cornea/Config.pm
r3 r4 2 2 use strict; 3 3 4 use strict; 5 use POSIX qw/uname/; 6 7 sub default_config_file { '/etc/cornea${host}.conf' } 8 9 { 10 my %_g_Config = (); 11 sub _g_Config { 12 my $key = shift; 13 $_g_Config{$key} = shift if @_; 14 $_g_Config{$key}; 15 } 16 sub _g_Config_unset { delete $_g_Config{shift}; } 17 sub _g_Config_isset { exists $_g_Config{shift}; } 18 } 19 20 sub new { 21 my $class = shift; 22 $class->_init(@_) unless _g_Config_isset('config_file'); 23 return bless sub { die "Illegal access"; }, __PACKAGE__; 24 } 25 26 sub _init { 27 my $class = shift; 28 my $config_file = shift || $class->default_config_file; 29 my @uname = POSIX::uname(); 30 my $i = 0; 31 foreach (qw/sysname nodename release version machine/) { 32 _g_Config("sysinfo::$_", $uname[$i++]); 33 } 34 35 eval { 36 my $host = ".$uname[1]"; 37 my $file = eval "\"$config_file\""; 38 $class->read_config($file); 39 }; 40 return unless $@; 41 42 # There was an error, use the default config 43 my $host = ''; 44 my $file = eval "\"$config_file\""; 45 $class->read_config($file); 46 return; 47 } 48 49 sub read_config { 50 my ($class, $file) = @_; 51 open(CONF, "<$file") || die "Could not read config file: $file"; 52 while(<CONF>) { 53 next if /^\s*[;#]/; 54 if(/^\s*([^\s=]+)\s*=\s*(.*)$/) { 55 my $key = lc($1); 56 (my $val = $2) =~ s/\s+$//; 57 _g_Config($key, $val); 58 } 59 } 60 close(CONF); 61 _g_Config('config_file', $file); 62 } 63 64 sub get { _g_Config(lc($_[1])); } 65 sub set { _g_Config(lc($_[1]), $_[2]); } 66 sub unset { _g_Config_unset(lc($_[1])); } 67 sub isset { _g_Config_isset(lc($_[1])); } 68 4 69 1; 70 1; trunk/perl/lib/Cornea/Queue.pm
r3 r4 1 1 package Cornea::Queue; 2 2 use strict; 3 use NET::STOMP; 3 use YAML (); 4 use Cornea::Config; 5 use Net::Stomp; 4 6 5 sub enqueue($$) { 6 my ($op, $detail) = @_; 7 7 sub __reconnect { 8 my $self = shift; 9 my $config = Cornea::Config->new(); 10 if($self->{stomp}) { 11 eval { $self->{stomp}->disconnect(); }; 12 delete $self->{stomp}; 13 } 14 my $stomp = Net::Stomp->new( { hostname => $config->get("MQ::hostname"), 15 port => $config->get("MQ::port") }); 16 foreach (@{$self->{queues}}) { 17 $stomp->subscribe( { destination => $_, ack => 'client' } ); 18 } 19 $self->{stomp} = $stomp; 8 20 } 9 21 10 sub dequeue() { 11 my ($op, $detail); 12 return ($op, $detail); 22 sub new { 23 my $class = shift; 24 my $self = bless { queues => [@_] }, $class; 25 $self->__reconnect(); 26 return $self; 27 } 28 29 sub enqueue { 30 my $self = shift; 31 my $retried = 0; 32 my $config = Cornea::Config->new(); 33 my ($op, $detail) = @_; 34 my $payload = YAML::Dump($op, $detail); 35 while(1) { 36 last unless eval { 37 $self->{stomp}->send( 38 { destination => $config->get("MQ::queue_" . lc($op)), 39 body => $payload } 40 ); 41 } || $@; 42 last if ($retried); 43 $self->__reconnect(); 44 $retried = 1; 45 } 46 return 1; 47 } 48 49 sub dequeue { 50 my $self = shift; 51 my $sub = shift; 52 53 my $frame = $self->{stomp}->receive_frame; 54 my ($op, $detail) = YAML::Load($frame->body); 55 if($sub->($op, $detail)) { 56 $self->{stomp}->ack( { frame => $frame } ); 57 } 58 } 59 60 sub worker { 61 my $self = shift; 62 my $sub = shift; 63 while(1) { 64 $self->dequeue($sub); 65 } 13 66 } 14 67 trunk/perl/lib/Cornea/RecallTable.pm
r3 r4 9 9 10 10 my $config = Cornea::Config->new(); 11 my $dbh = DBI->connect($config-> dsn(),12 $config-> dbuser(),13 $config-> dbpass(),11 my $dbh = DBI->connect($config->get("DB::dsn"), 12 $config->get("DB::user"), 13 $config->get("DB::pass"), 14 14 { PrintError => 0, RaiseError => 1 }, 15 15 );
