Wednesday 20 May 2009

Stomping on the Rabbit, Pt2

So now I have this all installed, I have to start using it. Net::Stomp (see cpan.org) is the interface I would like to use, but the setup seems very much done for ActiveMQ (as some of the headers are 'activemq.xxxxx'.

This means that by default, I only seem to be able to send and receive messages to a queue that exists only whilst someone is listening.

The example scripts that come with the RabbitMQ-Stomp distribution only show this for PERL, however, the Ruby examples show much more (persistent queues, topics, etc).

Thankfully, I found the following post

http://tinyurl.com/pdznw9

which explains the headers for RabbitMQ to produce persistent queues and operating topics. For Net::Stomp, the key is to add these headers to the key-value pairs in your connection/send hash, and happily they go through. Here are some examples:

Persistent Queues:

Receiver -

my $stomp = Net::Stomp->new({hostname=>'localhost', port=>'61613'});
$stomp->connect({login=>'guest', passcode=>'guest'}) or croak $EVAL_ERROR;

$SIG{INT} = sub {
$stomp->unsubscribe({
destination => q(/queue/father/ted),
});
exit;
};

$stomp->subscribe({
destination => q(/queue/super/ted),
q{auto-delete} => q{false}, # setting these flags will make your queue remain whilst
q{durable} => q{true}, # the subscriber(s) go away, and pick up messages afterwards
ack => q(client),
});

while (1) {
my $frame = $stomp->receive_frame;
print $frame;
print $frame->body . "\n";
$stomp->ack({frame=>$frame});
last if $frame->body eq 'QUIT';
}

$stomp->disconnect;

Sender -

my $stomp = Net::Stomp->new({hostname=>'localhost', port=>'61613'});
$stomp->connect({login=>'guest', passcode=>'guest'}) or croak $EVAL_ERROR;
$stomp->send({destination => '/queue/super/ted',
bytes_message=>1,
body=>($ARGV[0] or "test\0message")});
$stomp->disconnect;

Topics:

Topics are handled differently in RabbitMQ to ActiveMQ. In ActiveMQ, they sit under a namespace /topic/xxx/yyy and a combination of the client-id, destination, exchange and routing key all make up the subscriber to the topic

In the case of RabbitMQ, it seems a bit more generic than that, but has some differences which make it (as far as I can see) non-persistent.

Receiver -

my $stomp = Net::Stomp->new({hostname=>'localhost', port=>'61613'});
$stomp->connect({login=>'guest', passcode=>'guest'}) or croak $EVAL_ERROR;

$SIG{INT} = sub {
exit;
};

$stomp->subscribe({
destination => q{bananaman}, # needs to be a unique client-id
exchange => q{amq.topic},
routing_key => q{bananas},
});

while (1) {
my $frame = $stomp->receive_frame;
print $frame;
print $frame->body . "\n";
last if $frame->body eq 'QUIT';
}

$stomp->disconnect;

This sets up a receiver, which if you use rabbitmqctl list_queues shows a queue bananaman which will receive messages (whilst he is listening) to the topic bananas

using the following gives an anonymous looking receiver queue

$stomp->subscribe({
id => q{banaman}, # needs to be a unique client-id
destination => q{},
exchange => q{amq.topic},
routing_key => q{bananas},
});

which you can unsubscribe from explicitly

$stomp->subscribe({
id => q{banaman},
});

However, it would appear that the unsubscription occurs anyway, so unfortunately, any posts to the topic bananas would be lost to bananaman whilst he is away.

Sender for topics -

my $stomp = Net::Stomp->new({hostname=>'localhost', port=>'61613'});
$stomp->connect({login=>'guest', passcode=>'guest'}) or croak $EVAL_ERROR;
$stomp->send({destination => q{bananas},
exchange => q{amq.topic},
body=>($ARGV[0] or "test\0message")});
$stomp->disconnect;

You will notice here that the destination for this message is the routing_key, and that the exchange flag is set.

The problem here is that essentially, each receiver to a particular topic (routing_key) forms a temporary queue to which the message is added, and then sent from, but the queue is just that, temporary and goes away when the receiver does, so that receiver will never get the message if they are not present when the message is sent to the message-queue.

I am very keen to hear from anyone who has found out how to work around this.

The other main problem that we have seen due to this is MQ agnosticism doesn't exist using Net::Stomp, as essentially the problem is that it is written with ActiveMQ in mind, and just feeds headers through. This means that there is no conversion of the headers between using ActiveMQ-STOMP, RabbitMQ-STOMP or any other message-queue-STOMP that is out there. This means that there is no way to easily hot-swap message-queues without code re-write. Again, I'd be happy to hear form anyone who has worked out how to get around this.

As for now, it looks as though it is going to be ActiveMQ, as this has been installed centrally for us, and appears to manage the persistence a bit better, along with headers which are documented in Net::Stomp.

Friday 15 May 2009

Stomping on the Rabbit

we are trying to make a move to using message queues in my group to deal with pipelines and talking to other apps.

There should be some great advantages for us, and I am quite excited by this.

First thing, set up a STOMP message queue.

Now, I am currently writing a simple message_queue based on ClearPress, but this is not yet ready, so I have just downloaded and installed RabbitMQ.

RabbitMQ is written in Erlang, (which I have just set myself the challenge of learning from the Pragprog book Programming Erlang by Joe Armstrong). The first challenge was setting it up.

First: You need erlang installed. I had already done this, so wasn't a problem. Jsut make sure that erlc is in your path.

Now, I had dowloaded the latest release tarball, and expanded this, setting up in my sandbox area. However, this threw my a serious curveball with trying to install STOMP.

Thankfully, Google is my friend, and someone had the same problem, since the STOMP needs installing against the correct version number, so I hereby give the definitive installation guide to getting RabbitMQ up and running in a sandbox on MacOSX.

(I accept no responsibility for this not working on your machine!)

1) Install Mercurial (yet another distributed version control system, but the one which RabbitMQ is on).

2) (Thanks to everyone on this page http://tinyurl.com/pk7xfd)
hg clone http://hg.rabbitmq.com/rabbitmq-server
hg clone http://hg.rabbitmq.com/rabbitmq-codegen
hg clone http://hg.rabbitmq.com/rabbitmq-stomp
(cd rabbitmq-server; hg up rabbitmq_v1_5_4)
(cd rabbitmq-codegen; hg up rabbitmq_v1_5_4)
(cd rabbitmq-stomp; hg up rabbitmq_v1_5_3)

3) (At this point, you need to check your version of python and simplejson, which needs installing)

4) In the various MakeFiles, alter source roots to point to where you want the various db and log files to go, where your rabbit source root is, etc..

Eg.

I set up in my sandbox a folder 'rabbitmq', in which I put a logs dir, mnesia (db files) dir and rabbit-mnesia dir (into which I put the rabbitmq.conf file)

In 'rabbitmq-server/Makefile'

RABBITMQ_NODENAME=rabbit
RABBITMQ_SERVER_START_ARGS=/my/sandbox/rabbitmq/rabbitmq.conf
RABBITMQ_MNESIA_DIR=/my/sandbox/rabbitmq/$(RABBITMQ_NODENAME)-mnesia
RABBITMQ_LOG_BASE=/my/sandbox/rabbitmq/logs


4) make -C rabbitmq-server
make -C rabbitmq-stomp run

Hey presto, you now have a rabbitmq-stomp server up and running.

Now you have done this, you can try either the perl or ruby tests.

Hope this guide is of use to someone.

Cheers

Andy