AMQP cake with Ruby: Using Ruby and RabbitMQ together

04 August 2009

There are a bunch of different ways to get connected to RabbitMQ. Thankfully the community has somewhat rallied behind the queue messaging system and there are a growing number of clients/libraries to aid in accessing the queue’s functionality.

In this post, we’ll explore the different options for connecting a ruby program to the AMQP-driven queue server. So, with no further ado:

AMQP: the grandfather

Installing

gem sources -a http://gems.github.com
sudo gem install tmm1-amqp
# OR, for the developers in the crowd
git clone git://github.com/tmm1/amqp.git
rake gem
sudo gem install amqp-*.gem

Now that we’ve got it installed, let’s write a quick example to get connected:

require "rubygems"
require 'mq'

EM.run {
  AMQP.start(:host => 'localhost') do
    amq = MQ.new
    EM.add_periodic_timer(1) { amq.queue("noises").publish("moo") }
  end
}

Exactly as it looks, we’ll be using EventMachine to provide us with a periodic timer. When the timer has been exhausted, the block will be run, in our case we’ll push the message “moo” to the queue named “noises” at the RabbitMQ host (in this example, it’s at localhost).

Now, let’s consume a message now. This is nearly straight from the examples/ directory of tmm1-amqp library:

require "rubygems"
require 'mq'

AMQP.start(:host => 'localhost') do
  MQ.queue("noises").publish("moo")
  MQ.queue("noises").publish("quack")
  MQ.queue("noises").publish("bark")
  
  i = 0
  
  MQ.queue('noises').subscribe(:ack => true) do |h,m|
    if (i+=1) == 3
      puts 'Shutting down...'
      AMQP.stop{ EM.stop }
    end

    if AMQP.closing?
      puts "#{m} (ignored, redelivered later)"
    else
      puts m
      h.ack
    end
  end
end  

# moo
# quack
# Shutting down...
# bark (ignored, redelivered later)

In this example, we’ve pushed a few sounds to the noises queue and then later subscribe to the queue (and demanding an acknowledgement receipt).

There are alternative methods to pull a message off the queue. For instance, you can use pop (if you don’t need an ack of the message):

queue.pop{ |msg|
  unless msg
    # queue was empty
    p [Time.now, :queue_empty!]

    # try again in 1 second
    EM.add_timer(1){ queue.pop }
  else
    # process this message
    p [Time.now, msg]

    # get the next message in the queue
    queue.pop
  end
}

Up until now, we’ve only used a queue type called: type. Now, there are N+1 ways to skin a cat, so let’s look at sending a queue message everyone who is subscribed to the fanout. First off, to understand how this works, let’s define a fanout. A fanout in RabbitMQ terminology is like a direct exchange, except that it represents a 1:N message delivery pattern. Now, one more, we need to define a binding as well, you’ll see why in just a minute. First, a binding, from the RabbitMQ documentation:

When you publish a message, you send a "routing key" along with it, that's used by the exchange when it decides which queues to forward a copy of the message on to. The links between exchanges and queues are created through binding, with a "binding pattern" that is used by the exchange when comparing against routing keys.

Now that we’ve got that under control, let’s actually write one. Again, this is nearly directly from the examples/mq/clock.rb if you’d like to look at a more complete example

require "rubygems"
require "mq"

AMQP.start(:host => 'localhost') do
  def log *args
    p args
  end
  
  clock = MQ.new.fanout('clock')
  EM.add_periodic_timer(1){
    log :publishing, time = Time.now
    clock.publish(Marshal.dump(time))
  }

  amq = MQ.new
  amq.queue('every second').bind(amq.fanout('clock')).subscribe{ |time|
    log 'every second', :received, Marshal.load(time)
  }
  
  amq = MQ.new
  amq.queue('every 5 seconds').bind(amq.fanout('clock')).subscribe{ |time|
    time = Marshal.load(time)
    log 'every 5 seconds', :received, time if time.strftime('%S').to_i%5 == 0
  }
end

Okay, So first off, we created a new fanout and using MQ and EventMachine again, we’ve created a periodic timer that will publish to the clock fanout (again, a 1:N communication). Then we bind (recall a binding is just a link to the queue’s “router”) to the fanout named “clock” and subscribe to it whn there is a new message in the clock queue. Notice we take two differently named queues (one is called “every second” and the other “every 5 seconds”) and bind them to the same fanout. This works because the fanout provides the 1:N communication again.

Now, notice that we are only logging if the time mod 5 is zero, or essentially every 5 seconds. However, if we remove that if block, then you’ll notice that the queue still receives messages. This is because the fanout is publishing the message to EVERYONE who is subscribed to the fanout. That’s how the fanout works.

Now, the AMQP library provides something really kind of neat. You can send “RPC” calls across the wire to the queue. How? Let’s look at another example from the examples/ directory:

require "rubygems"
require 'mq'

AMQP.start(:host => 'localhost') do

  def log *args
    p args
  end

  class HashTable < Hash
    def get(key)
      self[key]
    end
    
    def set(key, value)
      self[key] = value
    end
  end

  server = MQ.new.rpc('hash table node', HashTable.new)

  client = MQ.new.rpc('hash table node')
  client.set(:now, time = Time.now)
  client.get(:now) do |res|
    log 'client', :now => res, :eql? => res == time
  end

  client.set(:one, 1)
  client.get :one do |res| 
    p res
  end
  client.keys do |res|
    log 'client', :keys => res
    AMQP.stop{ EM.stop }
  end

end

Sweetness. We are taking a HashTable object and storing it’s in a Marshalled format (thanks to the amqp gem) into the queue. When we pop it out, we get a full object back.

A few other notes.

We can create a topic exchange, which is kind of like a fanout, except that instead of 1:N, it’s N:N. To do this, we’ll write a mini chat server (well, we’ll pull it from the very smart blog: http://www.randomhacks.net/articles/2009/05/08/chat-client-ruby-amqp-eventmachine-shoes). Some more quick overhead before we get to the code. When we created the binding before, we set a :key that was equivalent to the name of the queue. We can do better than that and specifically set it to an exact queue channel via the bind method:

queue.bind('chat', :key => $channel)

This sets a binding on the chat queue to match any key with the name of the channel in the chat program. You can set wildcards, and arrays in the key parameter. Now, when we publish a message, we’ll use the routing key to ensure it gets sent to the right bindings:

MQ.queue('chat').publish(" -> #{data}", :routing_key => $channel)

Now, on to the entire program:

require 'rubygems'
gem 'amqp'
require 'mq'

unless ARGV.length == 2
  STDERR.puts "Usage: #{$0} <channel> <nick>"
  exit 1
end
$channel, $nick = ARGV

AMQP.start(:host => 'localhost') do
  $chat = MQ.topic('chat')

  # Print any messages on our channel.
  queue = MQ.queue($nick)
  queue.bind('chat', :key => $channel)
  queue.subscribe do |msg|
    if msg.index("#{$nick}:") != 0
      puts msg
    end
  end

  # Forward console input to our channel.
  module KeyboardInput
    include EM::Protocols::LineText2
    def receive_line data
      $chat.publish("#{$nick}: #{data}",
                    :routing_key => $channel)
    end
  end
  EM.open_keyboard(KeyboardInput)
end

Last example of the day: We can distribute work in our programs as well.

require "rubygems"
require 'mq'

MAX = 1000

# logging
def log *args
  p args
end

# spawn workers
workers = ARGV[0] ? (Integer(ARGV[0]) rescue 1) : 1
AMQP.fork(workers) do

  log MQ.id, :started

  class Fixnum
    def prime?
      ('1' * self) !~ /^1?$|^(11+?)\1+$/
    end
  end

  class PrimeChecker
    def is_prime? number
      log "prime checker #{MQ.id}", :prime?, number
      number.prime?
    end
  end

  MQ.rpc('prime checker', PrimeChecker.new)

end

# use workers to check which numbers are prime
AMQP.start(:host => 'localhost') do
  
  prime_checker = MQ.rpc('prime checker')

  (10_000...(10_000+MAX)).each do |num|
    log :checking, num

    prime_checker.is_prime?(num) { |is_prime|
      log :prime?, num, is_prime
      (@primes||=[]) << num if is_prime
      
      if (@responses = (@responses || 0) + 1) == MAX
        log :primes=, @primes
        EM.stop_event_loop
      end
    }

  end
  
end

By using the fork method in AMQP, we are forking several consumers of the messages in the ‘prime checker’ rpc queue. This shows just how easy it is to multithread a consumer for an amqp queue.