AMQP cake with Ruby: Using Ruby and RabbitMQ together
04 August 2009
There are a bunch of different ways to get connected to RabbitMQ. Thankfully the community has somewhat rallied behind the queue messaging system and there are a growing number of clients/libraries to aid in accessing the queue’s functionality.
In this post, we’ll explore the different options for connecting a ruby program to the AMQP-driven queue server. So, with no further ado:
AMQP: the grandfather
Installing
gem sources -a http://gems.github.com
sudo gem install tmm1-amqp
# OR, for the developers in the crowd
git clone git://github.com/tmm1/amqp.git
rake gem
sudo gem install amqp-*.gem
Now that we’ve got it installed, let’s write a quick example to get connected:
require "rubygems"
require 'mq'
EM.run {
AMQP.start(:host => 'localhost') do
amq = MQ.new
EM.add_periodic_timer(1) { amq.queue("noises").publish("moo") }
end
}
Exactly as it looks, we’ll be using EventMachine to provide us with a periodic timer. When the timer has been exhausted, the block will be run, in our case we’ll push the message “moo” to the queue named “noises” at the RabbitMQ host (in this example, it’s at localhost).
Now, let’s consume a message now. This is nearly straight from the examples/ directory of tmm1-amqp library:
require "rubygems"
require 'mq'
AMQP.start(:host => 'localhost') do
MQ.queue("noises").publish("moo")
MQ.queue("noises").publish("quack")
MQ.queue("noises").publish("bark")
i = 0
MQ.queue('noises').subscribe(:ack => true) do |h,m|
if (i+=1) == 3
puts 'Shutting down...'
AMQP.stop{ EM.stop }
end
if AMQP.closing?
puts "#{m} (ignored, redelivered later)"
else
puts m
h.ack
end
end
end
# moo
# quack
# Shutting down...
# bark (ignored, redelivered later)
In this example, we’ve pushed a few sounds to the noises queue and then later subscribe to the queue (and demanding an acknowledgement receipt).
There are alternative methods to pull a message off the queue. For instance, you can use pop (if you don’t need an ack of the message):
queue.pop{ |msg|
unless msg
# queue was empty
p [Time.now, :queue_empty!]
# try again in 1 second
EM.add_timer(1){ queue.pop }
else
# process this message
p [Time.now, msg]
# get the next message in the queue
queue.pop
end
}
Up until now, we’ve only used a queue type called: type. Now, there are N+1 ways to skin a cat, so let’s look at sending a queue message everyone who is subscribed to the fanout. First off, to understand how this works, let’s define a fanout. A fanout in RabbitMQ terminology is like a direct exchange, except that it represents a 1:N message delivery pattern. Now, one more, we need to define a binding as well, you’ll see why in just a minute. First, a binding, from the RabbitMQ documentation:
‘When you publish a message, you send a "routing key" along with it, that's used by the exchange when it decides which queues to forward a copy of the message on to. The links between exchanges and queues are created through binding, with a "binding pattern" that is used by the exchange when comparing against routing keys.
Now that we’ve got that under control, let’s actually write one. Again, this is nearly directly from the examples/mq/clock.rb if you’d like to look at a more complete example
require "rubygems"
require "mq"
AMQP.start(:host => 'localhost') do
def log *args
p args
end
clock = MQ.new.fanout('clock')
EM.add_periodic_timer(1){
log :publishing, time = Time.now
clock.publish(Marshal.dump(time))
}
amq = MQ.new
amq.queue('every second').bind(amq.fanout('clock')).subscribe{ |time|
log 'every second', :received, Marshal.load(time)
}
amq = MQ.new
amq.queue('every 5 seconds').bind(amq.fanout('clock')).subscribe{ |time|
time = Marshal.load(time)
log 'every 5 seconds', :received, time if time.strftime('%S').to_i%5 == 0
}
end
Okay, So first off, we created a new fanout and using MQ and EventMachine again, we’ve created a periodic timer that will publish to the clock fanout (again, a 1:N communication). Then we bind (recall a binding is just a link to the queue’s “router”) to the fanout named “clock” and subscribe to it whn there is a new message in the clock queue. Notice we take two differently named queues (one is called “every second” and the other “every 5 seconds”) and bind them to the same fanout. This works because the fanout provides the 1:N communication again.
Now, notice that we are only logging if the time mod 5 is zero, or essentially every 5 seconds. However, if we remove that if block, then you’ll notice that the queue still receives messages. This is because the fanout is publishing the message to EVERYONE who is subscribed to the fanout. That’s how the fanout works.
Now, the AMQP library provides something really kind of neat. You can send “RPC” calls across the wire to the queue. How? Let’s look at another example from the examples/ directory:
require "rubygems"
require 'mq'
AMQP.start(:host => 'localhost') do
def log *args
p args
end
class HashTable < Hash
def get(key)
self[key]
end
def set(key, value)
self[key] = value
end
end
server = MQ.new.rpc('hash table node', HashTable.new)
client = MQ.new.rpc('hash table node')
client.set(:now, time = Time.now)
client.get(:now) do |res|
log 'client', :now => res, :eql? => res == time
end
client.set(:one, 1)
client.get :one do |res|
p res
end
client.keys do |res|
log 'client', :keys => res
AMQP.stop{ EM.stop }
end
end
Sweetness. We are taking a HashTable object and storing it’s in a Marshalled format (thanks to the amqp gem) into the queue. When we pop it out, we get a full object back.
A few other notes.
We can create a topic exchange, which is kind of like a fanout, except that instead of 1:N, it’s N:N. To do this, we’ll write a mini chat server (well, we’ll pull it from the very smart blog: http://www.randomhacks.net/articles/2009/05/08/chat-client-ruby-amqp-eventmachine-shoes). Some more quick overhead before we get to the code. When we created the binding before, we set a :key that was equivalent to the name of the queue. We can do better than that and specifically set it to an exact queue channel via the bind method:
queue.bind('chat', :key => $channel)
This sets a binding on the chat queue to match any key with the name of the channel in the chat program. You can set wildcards, and arrays in the key parameter. Now, when we publish a message, we’ll use the routing key to ensure it gets sent to the right bindings:
MQ.queue('chat').publish(" -> #{data}", :routing_key => $channel)
Now, on to the entire program:
require 'rubygems'
gem 'amqp'
require 'mq'
unless ARGV.length == 2
STDERR.puts "Usage: #{$0} <channel> <nick>"
exit 1
end
$channel, $nick = ARGV
AMQP.start(:host => 'localhost') do
$chat = MQ.topic('chat')
# Print any messages on our channel.
queue = MQ.queue($nick)
queue.bind('chat', :key => $channel)
queue.subscribe do |msg|
if msg.index("#{$nick}:") != 0
puts msg
end
end
# Forward console input to our channel.
module KeyboardInput
include EM::Protocols::LineText2
def receive_line data
$chat.publish("#{$nick}: #{data}",
:routing_key => $channel)
end
end
EM.open_keyboard(KeyboardInput)
end
Last example of the day: We can distribute work in our programs as well.
require "rubygems"
require 'mq'
MAX = 1000
# logging
def log *args
p args
end
# spawn workers
workers = ARGV[0] ? (Integer(ARGV[0]) rescue 1) : 1
AMQP.fork(workers) do
log MQ.id, :started
class Fixnum
def prime?
('1' * self) !~ /^1?$|^(11+?)\1+$/
end
end
class PrimeChecker
def is_prime? number
log "prime checker #{MQ.id}", :prime?, number
number.prime?
end
end
MQ.rpc('prime checker', PrimeChecker.new)
end
# use workers to check which numbers are prime
AMQP.start(:host => 'localhost') do
prime_checker = MQ.rpc('prime checker')
(10_000...(10_000+MAX)).each do |num|
log :checking, num
prime_checker.is_prime?(num) { |is_prime|
log :prime?, num, is_prime
(@primes||=[]) << num if is_prime
if (@responses = (@responses || 0) + 1) == MAX
log :primes=, @primes
EM.stop_event_loop
end
}
end
end
By using the fork method in AMQP, we are forking several consumers of the messages in the ‘prime checker’ rpc queue. This shows just how easy it is to multithread a consumer for an amqp queue.
Comments
blog comments powered by Disqus