root/trunk/perl/lib/Cornea/Queue.pm

Revision 4, 1.4 kB (checked in by jesus, 9 years ago)

more code, at some point, I should see if this stuff even works

Line 
1 package Cornea::Queue;
2 use strict;
3 use YAML ();
4 use Cornea::Config;
5 use Net::Stomp;
6
7 sub __reconnect {
8   my $self = shift;
9   my $config = Cornea::Config->new();
10   if($self->{stomp}) {
11     eval { $self->{stomp}->disconnect(); };
12     delete $self->{stomp};
13   }
14   my $stomp = Net::Stomp->new( { hostname => $config->get("MQ::hostname"),
15                                  port => $config->get("MQ::port") });
16   foreach (@{$self->{queues}}) {
17     $stomp->subscribe( { destination => $_, ack => 'client' } );
18   }
19   $self->{stomp} = $stomp;
20 }
21
22 sub new {
23   my $class = shift;
24   my $self = bless { queues => [@_] }, $class;
25   $self->__reconnect();
26   return $self;
27 }
28
29 sub enqueue {
30   my $self = shift;
31   my $retried = 0;
32   my $config = Cornea::Config->new();
33   my ($op, $detail) = @_;
34   my $payload = YAML::Dump($op, $detail);
35   while(1) {
36     last unless eval {
37       $self->{stomp}->send(
38         { destination => $config->get("MQ::queue_" . lc($op)),
39           body => $payload }
40       );
41     } || $@;
42     last if ($retried);
43     $self->__reconnect();
44     $retried = 1;
45   }
46   return 1;
47 }
48
49 sub dequeue {
50   my $self = shift;
51   my $sub = shift;
52  
53   my $frame = $self->{stomp}->receive_frame;
54   my ($op, $detail) = YAML::Load($frame->body);
55   if($sub->($op, $detail)) {
56     $self->{stomp}->ack( { frame => $frame } );
57   }
58 }
59
60 sub worker {
61   my $self = shift;
62   my $sub = shift;
63   while(1) {
64     $self->dequeue($sub);
65   }
66 }
67
68 1;
Note: See TracBrowser for help on using the browser.