root/trunk/perl/lib/Cornea.pm

Revision 5, 6.2 kB (checked in by jesus, 5 years ago)

allow validation of input -- per leon

Line 
1 package Cornea;
2
3 use strict;
4 use Cornea::Config;
5 use Cornea::RepresentationInfo;
6 use Cornea::RecallTable;
7 use Cornea::Queue;
8 use Cornea::StorageNode;
9 use Cornea::StorageNodeList;
10
11 sub new {
12   my $class = shift;
13   $class = ref($class) ? ref $class : $class;
14   bless {}, $class;
15 }
16
17 sub submit {
18   my $self = shift;
19   my $input = shift;
20   my ($serviceId,$assetId) = @_;
21
22   my $rt = Cornea::RecallTable->new();
23   my $repinfo = $rt->repInfo($serviceId, 0);
24   $repinfo->validate($serviceId, $input);
25   return $self->store($input, $serviceId, $assetId, 0);
26 }
27
28 sub store {
29   my $self = shift;
30   my $input = shift;
31   my ($serviceId,$assetId,$repId) = @_;
32
33   my $rt = Cornea::RecallTable->new();
34   my $gold = undef;
35
36   my $repinfo = $rt->repInfo($serviceId, $repId);
37   my $N = $rt->getOpenNodes();
38
39   my $S = Cornea::StorageNodeList->new();
40   foreach my $n ($N->items) {
41     if ($n->put($input,$serviceId,$assetId,$repId)) {
42       $S->add($n);
43       $N->remove($n);
44       $gold = $n;
45       last;
46     }
47     else {
48       $N->remove($n);
49     }
50   }
51
52   die "Only one node available" if($S->count == 0);
53
54   while($S->count < $repinfo->replicationCount) {
55     my $T = $N->copy();
56     $T->removeWithin($S, $repinfo->distance);
57     last if($T->count == 0); # can't find adequate nodes
58     foreach my $n ($T->items) {
59       if ($n->put($gold, $serviceId, $assetId, $repId)) {
60         $S->add($n);
61         $N->remove($n);
62         last; # break out and recalc so that distance is right
63       }
64       else {
65         $T->remove($n);
66       }
67     }
68     last if($T->count == 0); # can't find adequate working nodes
69   }
70
71   if ($S->count < $repinfo->replicationCount) {
72     my $config = Cornea::Config->new();
73     if($config->must_be_complete()) {
74       foreach my $n ($S->items) { $n->delete($serviceId, $assetId, $repId); }
75       die "Failed to reach required replication count";
76     }
77     else {
78       my $copies_needed = $repinfo->replicationCount - $S->count;
79       if (not Cornea::Queue::enqueue('REPLICATE',
80                                      { 'serviceId' => $serviceId,
81                                        'assetId' => $assetId,
82                                        'repId' => $repId,
83                                        'copies' => $copies_needed })) {
84         foreach my $n ($S->items) { $n->delete($serviceId, $assetId, $repId); }
85         die "Failed to reach required replication count";
86       }
87     }
88   }
89   if (not $rt->insert($serviceId,$assetId,$repId,$S)) {
90     foreach my $n ($S->items) { $n->delete($serviceId, $assetId, $repId); }
91     die "Failed to record metadata";
92   }
93
94   if (scalar($repinfo->dependents) > 0) {
95     if (not Cornea::Queue::enqueue('PROCESS',
96                                    { 'serviceId' => $serviceId,
97                                      'assetId' => $assetId,
98                                      'repId' => $repId })) {
99       foreach my $n ($S->items) { $n->delete($serviceId, $assetId, $repId); }
100       die "Failed to queue work against asset";
101     }
102   }
103
104   return $S;
105 }
106
107 sub process {
108   my $self = shift;
109   my $detail = shift;
110   my $serviceId = $detail->{serviceId};
111   my $assetId = $detail->{assetId};
112   my $repId = $detail->{repId};
113   my $rt = Cornea::RecallTable->new();
114   my $repinfo = $rt->repInfo($serviceId, $repId);
115   return if(scalar($repinfo->dependents) == 0);
116
117   my $input = undef;
118   my $N = $rt->find($serviceId, $assetId, $repId);
119   foreach my $n ($N->items) {
120     $input = $n->fetch($serviceId, $assetId, $repId);
121     last if($input);
122   }
123   if (not defined $input) {
124     $self->log("FAILED($serviceId, $assetId, $repId) -> fetch\n");
125     return;
126   }
127
128   foreach my $deprepinfo ($repinfo->dependents) {
129     my $repOutId = $deprepinfo->repId;
130     my $output = undef;
131     eval {
132       $output = $deprepinfo->transform($serviceId, $input,
133                                        $repinfo->repId, $repOutId);
134     };
135     if ($@ or not defined $output) {
136       $self->log("FAILED($serviceId, $assetId, $repId) -> transform($repOutId)\n");
137     }
138     else {
139       eval {
140         $self->store($output, $serviceId, $assetId, $repOutId);
141       };
142       if($@) {
143         $self->log("FAILED($serviceId, $assetId, $repId) -> store($repOutId)\n");
144       }
145     }
146   }
147 }
148 sub replicate {
149   my $self = shift;
150   my $detail = shift;
151   my $serviceId = $detail->{serviceId};
152   my $assetId = $detail->{assetId};
153   my $repId = $detail->{repId};
154   my $copies = $detail->{copies};
155   my $rt = Cornea::RecallTable->new();
156   my $repinfo = $rt->repInfo($serviceId, $repId);
157
158   my $N = $rt->getOpenNodes();
159   my $S = $rt->find($serviceId, $assetId, $repId);
160   my $C = Cornea::StorageNodeList->new();
161   foreach my $n ($S->items) {
162     $N->remove($n);
163   }
164   while ($copies > 0) {
165     my $T = $N->copy();
166     $T->removeWithin($S, $repinfo->distance);
167     last if($T->count == 0); # can't find adequate nodes
168     foreach my $n ($T->items) {
169       if ($n->put($S, $serviceId, $assetId, $repId)) {
170         $S->add($n);
171         $C->add($n);
172         $N->remove($n);
173         $copies--;
174         last; # break out and recalc so that distance is right
175       }
176       else {
177         $T->remove($n);
178       }
179     }
180     last if($T->count == 0); # can't find adequate working nodes
181   }
182
183   # If the following updates fail, we only undo the strage changeset $C
184   if ($copies > 0) {
185     if (not Cornea::Queue::enqueue('REPLICATE',
186                                    { 'serviceId' => $serviceId,
187                                      'assetId' => $assetId,
188                                      'repId' => $repId,
189                                      'copies' => $copies })) {
190       foreach my $n ($C->items) { $n->delete($serviceId, $assetId, $repId); }
191       die "Failed to reach required replication count";
192     }
193   }
194
195   # we only update the changeset if there is one
196   if (scalar($C->items) and not $rt->insert($serviceId,$assetId,$repId,$S)) {
197     foreach my $n ($C->items) { $n->delete($serviceId, $assetId, $repId); }
198     die "Failed to record metadata";
199   }
200 }
201
202
203 sub worker {
204   my $self = shift;
205   my $queue = Cornea::Queue->new();
206
207   $queue->worker(
208     sub {
209       my $op = shift;
210       my $detail = shift;
211       if    ($op eq 'PROCESS')    { $self->process($detail); }
212       elsif ($op eq 'REPLICATE')  { $self->replicate($detail); }
213       else                        { $self->log("UNKNOWNE Queue op($op)\n"); }
214     }
215   );
216 }
217
218 1;
Note: See TracBrowser for help on using the browser.