root/trunk/perl/lib/Cornea/Queue.pm

Revision 6, 1.7 kB (checked in by jesus, 9 years ago)

robustness in the transport layers (sql and mq) and a skeleton config

Line 
1 package Cornea::Queue;
2 use strict;
3 use YAML ();
4 use Cornea::Config;
5 use Net::Stomp;
6
7 sub __reconnect {
8   my $self = shift;
9   my $config = Cornea::Config->new();
10   if($self->{stomp}) {
11     eval { $self->{stomp}->disconnect(); };
12     delete $self->{stomp};
13   }
14   my $stomphosts = $config->get_list("MQ::hostname");
15   foreach my $hostname ( @$stomphosts ) {
16     eval {
17       my $stomp = Net::Stomp->new( { hostname => $hostame,
18                                      port => $config->get("MQ::port") });
19       $stomp->connect( { login => $config->get("MQ::login"),
20                          passcode => $config->get("MQ::passcode") } ) ||
21         die "could not connect to stomp on $hostname\n";
22     };
23     last unless $@;
24     $stomp = undef;
25   }
26   if($stomp) {
27     foreach (@{$self->{queues}}) {
28       $stomp->subscribe( { destination => $_, ack => 'client' } );
29     }
30   }
31   $self->{stomp} = $stomp;
32 }
33
34 sub new {
35   my $class = shift;
36   my $self = bless { queues => [@_] }, $class;
37   $self->__reconnect();
38   return $self;
39 }
40
41 sub enqueue {
42   my $self = shift;
43   my $retried = 0;
44   my $config = Cornea::Config->new();
45   my ($op, $detail) = @_;
46   my $payload = YAML::Dump($op, $detail);
47   while(1) {
48     last unless eval {
49       $self->{stomp}->send(
50         { destination => $config->get("MQ::queue_" . lc($op)),
51           body => $payload }
52       );
53     } || $@;
54     last if ($retried);
55     $self->__reconnect();
56     $retried = 1;
57   }
58   return 1;
59 }
60
61 sub dequeue {
62   my $self = shift;
63   my $sub = shift;
64  
65   my $frame = $self->{stomp}->receive_frame;
66   my ($op, $detail) = YAML::Load($frame->body);
67   if($sub->($op, $detail)) {
68     $self->{stomp}->ack( { frame => $frame } );
69   }
70 }
71
72 sub worker {
73   my $self = shift;
74   my $sub = shift;
75   while(1) {
76     $self->dequeue($sub);
77   }
78 }
79
80 1;
Note: See TracBrowser for help on using the browser.