Wednesday 20 May 2009

Stomping on the Rabbit, Pt2

So now I have this all installed, I have to start using it. Net::Stomp (see cpan.org) is the interface I would like to use, but the setup seems very much done for ActiveMQ (as some of the headers are 'activemq.xxxxx'.

This means that by default, I only seem to be able to send and receive messages to a queue that exists only whilst someone is listening.

The example scripts that come with the RabbitMQ-Stomp distribution only show this for PERL, however, the Ruby examples show much more (persistent queues, topics, etc).

Thankfully, I found the following post

http://tinyurl.com/pdznw9

which explains the headers for RabbitMQ to produce persistent queues and operating topics. For Net::Stomp, the key is to add these headers to the key-value pairs in your connection/send hash, and happily they go through. Here are some examples:

Persistent Queues:

Receiver -

my $stomp = Net::Stomp->new({hostname=>'localhost', port=>'61613'});
$stomp->connect({login=>'guest', passcode=>'guest'}) or croak $EVAL_ERROR;

$SIG{INT} = sub {
$stomp->unsubscribe({
destination => q(/queue/father/ted),
});
exit;
};

$stomp->subscribe({
destination => q(/queue/super/ted),
q{auto-delete} => q{false}, # setting these flags will make your queue remain whilst
q{durable} => q{true}, # the subscriber(s) go away, and pick up messages afterwards
ack => q(client),
});

while (1) {
my $frame = $stomp->receive_frame;
print $frame;
print $frame->body . "\n";
$stomp->ack({frame=>$frame});
last if $frame->body eq 'QUIT';
}

$stomp->disconnect;

Sender -

my $stomp = Net::Stomp->new({hostname=>'localhost', port=>'61613'});
$stomp->connect({login=>'guest', passcode=>'guest'}) or croak $EVAL_ERROR;
$stomp->send({destination => '/queue/super/ted',
bytes_message=>1,
body=>($ARGV[0] or "test\0message")});
$stomp->disconnect;

Topics:

Topics are handled differently in RabbitMQ to ActiveMQ. In ActiveMQ, they sit under a namespace /topic/xxx/yyy and a combination of the client-id, destination, exchange and routing key all make up the subscriber to the topic

In the case of RabbitMQ, it seems a bit more generic than that, but has some differences which make it (as far as I can see) non-persistent.

Receiver -

my $stomp = Net::Stomp->new({hostname=>'localhost', port=>'61613'});
$stomp->connect({login=>'guest', passcode=>'guest'}) or croak $EVAL_ERROR;

$SIG{INT} = sub {
exit;
};

$stomp->subscribe({
destination => q{bananaman}, # needs to be a unique client-id
exchange => q{amq.topic},
routing_key => q{bananas},
});

while (1) {
my $frame = $stomp->receive_frame;
print $frame;
print $frame->body . "\n";
last if $frame->body eq 'QUIT';
}

$stomp->disconnect;

This sets up a receiver, which if you use rabbitmqctl list_queues shows a queue bananaman which will receive messages (whilst he is listening) to the topic bananas

using the following gives an anonymous looking receiver queue

$stomp->subscribe({
id => q{banaman}, # needs to be a unique client-id
destination => q{},
exchange => q{amq.topic},
routing_key => q{bananas},
});

which you can unsubscribe from explicitly

$stomp->subscribe({
id => q{banaman},
});

However, it would appear that the unsubscription occurs anyway, so unfortunately, any posts to the topic bananas would be lost to bananaman whilst he is away.

Sender for topics -

my $stomp = Net::Stomp->new({hostname=>'localhost', port=>'61613'});
$stomp->connect({login=>'guest', passcode=>'guest'}) or croak $EVAL_ERROR;
$stomp->send({destination => q{bananas},
exchange => q{amq.topic},
body=>($ARGV[0] or "test\0message")});
$stomp->disconnect;

You will notice here that the destination for this message is the routing_key, and that the exchange flag is set.

The problem here is that essentially, each receiver to a particular topic (routing_key) forms a temporary queue to which the message is added, and then sent from, but the queue is just that, temporary and goes away when the receiver does, so that receiver will never get the message if they are not present when the message is sent to the message-queue.

I am very keen to hear from anyone who has found out how to work around this.

The other main problem that we have seen due to this is MQ agnosticism doesn't exist using Net::Stomp, as essentially the problem is that it is written with ActiveMQ in mind, and just feeds headers through. This means that there is no conversion of the headers between using ActiveMQ-STOMP, RabbitMQ-STOMP or any other message-queue-STOMP that is out there. This means that there is no way to easily hot-swap message-queues without code re-write. Again, I'd be happy to hear form anyone who has worked out how to get around this.

As for now, it looks as though it is going to be ActiveMQ, as this has been installed centrally for us, and appears to manage the persistence a bit better, along with headers which are documented in Net::Stomp.

1 comment:

alexis said...

Andy,

Have you tried supplying an auto-delete=false header in SUBSCRIBE (since the default is "true")? I am not sure if this is the solution. If you are still stuck, please email rabbitmq-discuss and you will definitely get help.

Cheers,

alexis

RabbitMQ