Queues, PubSub and RPC: From Ruby to Elixir ... and back

In this blog post I will describe three ways of using Elixir to process "requests" from a Ruby application.

To do so, I have created a simple Sinatra app with endpoints to interface with an Elixir app that implements the worker responsible for retrieving tweets based on a search string and number of tweets desired.

The endpoints on the Sinatra app are:

  • POST /api/v1/pubsub/twitter
  • POST /api/v1/sidekiq/twitter
  • POST /api/v1/rpc/twitter
  • GET /api/twitter/:uuid

Here are some example requests and responses:

# pubsub request
http://localhost:4567/api/v1/pubsub/twitter  
json: {"search":"myelixirstatus", "number": 20}  

# response
{
uuid: "1bab11d8-f4f9-45c3-ace5-cffc907135b2"  
search_string: "myelixirstatus"  
number_of_tweets: 20  
requested_at: "2015-11-18 17:48:22 +0000"  
}

# sidekiq request
http://localhost:4567/api/v1/sidekiq/twitter  
{"search":"myelixirstatus", "number": 20}

# response
{
uuid: "9bfa7f79-536d-4e12-912c-4bc93b9c0625"  
search_string: "onfido"  
number_of_tweets: 20  
requested_at: "2015-11-18 17:48:57 +0000"  
}

# rpc request
http://localhost:4567/api/v1/rpc/twitter  
{"search":"onfido", "number": 20}

# response
# try it out ;)

Sidekiq, Sinatra and Elixir runningSidekiq, Sinatra and Elixir running

The first two endpoints trigger async requests. They will immediately return the uuid of the request and then will call the Elixir app for the actual tweet fetching using Redis, either via PubSub or via Sidekiq. You will find the tweets fetched stored in a file inside the tweet_store folder.

The third endpoint is sync; it will block and wait for the reply from the Elixir app.

The code for both Ruby and Elixir applications can be found here.

The only file not under version control is fetch_it_workers/config/secrets.exs. This file holds my twitter credentials. If you haven't yet registered a twitter app, you can do so here.

The secrets.exs file format is the following:

use Mix.Config

config :extwitter, :oauth, [  
  consumer_key: <CONSUMER KEY>,
  consumer_secret: <CONSUMER SECRET>,
  access_token: <ACCESS TOKEN>,
  access_token_secret: <ACCESS SECRET>
]


PubSub

Triggering the Elixir worker to fetch tweets using PubSub

Ruby

post "/pubsub/twitter" do  
  params = JSON.parse(request.body.read)
  data = Request.new.data(params)
  $redis.publish("workers", data.to_json)
  json data
end  

After parsing the params, we just need to publish in a redis queue named workers.

In the Elixir app, the code responsible for listening to any published messages on the workers queue is the following:

defmodule FetchItWorkers.RedisPubSub do  
  use GenServer

  @redis_sub_channel "workers"

  def start_link do
    GenServer.start_link(__MODULE__, [], name: :redis_pub_sub)
  end

  def init(_args) do
    {:ok, %{pub_sub_con: nil}, 0}
  end

  def handle_info(:timeout, _state) do
    # timeout of 0 on init on purpose to defer the redis queue subscribe to here
    {:ok, pub_sub_conn} = Redix.PubSub.start_link
    :ok = Redix.PubSub.subscribe(pub_sub_conn, @redis_sub_channel, self())

    {:noreply, %{pub_sub_conn: pub_sub_conn}}
  end

  def handle_info({:redix_pubsub, :subscribe, _channel, _}, state) do
    {:noreply, state}
  end

  def handle_info({:redix_pubsub, :message, message, _channel}, state) do

    {:ok, decoded} = Poison.decode(message)

    IO.puts "Handling PubSub Job (#{decoded["uuid"]})"

    tweets = FetchItWorkers.TwitterClient.fetch_tweets(:twitter_worker, decoded["search_string"], decoded["number_of_tweets"])
    FetchItWorkers.Filestore.store_tweets(decoded["uuid"], tweets)

    {:noreply, state}
  end
end  

The module implements a GenServer. Other than the standard name registration, the init function returns {:ok, %{pub_sub_con: nil}, 0}. The interesting thing here is that we are returning the state of the GenServer and a timeout of 0. This timeout allows the server to initialise immediately, not waiting for the connection to redis and the subscription of the channel to be completed.
Since we are actually forcing the server to timeout, the connection to redis will happen in the handle_info/2 function that pattern matches on (:timeout, _state).

The flow is the following:

  1. The GenServer starts and registers itself with a name.
  2. The init function is invoked and we immediately timeout.
  3. The timeout will cause a message to be sent, and that message is captured. by handle_info(:timeout, _state). The connection to redis and the subscription to the channel take place here.
  4. Immediately after the subscription, redix will send back a message that will be picked up by handle_info({:redix_pubsub, :subscribe, _channel, _}, state).
  5. When a message is pushed onto the redis channel the GenServer will be notified via handle_info({:redix_pubsub, :message, message, _channel}, state). In here the twitter worker gets invoked, fetches the tweets and saves them in a file.


Sidekiq

Triggering the Elixir worker to fetch tweets using Sidekiq.

Disclaimer: this implementation is heavily inspired by Benjamin Tan's blog post. The implementation on the Elixir side is a bit different in order to limit the amount of spawned processes by using poolboy.

Ruby

post "/sidekiq/twitter" do  
  params = JSON.parse(request.body.read)
  data = Request.new.data(params)
  TwitterWorker.perform_async(data)
  json data
end  
class TwitterWorker  
  include Sidekiq::Worker

  def self.perform_async(*payload)
    queue = "queue:elixir"
    json = {
      queue: queue,
      class: "TwitterWorker",
      args: payload,
      jid: SecureRandom.hex(12),
      enqueued_at: Time.now.to_f
    }.to_json
    client = Sidekiq.redis { |conn| conn }
    client.lpush(queue, json)
  end

  # it will get the results processed by elixir
  def perform(*payload)
    puts "\tDO SOMETHING IN RUBY ?"
    puts "\t>>>> Received: #{payload}"
  end
end  

The Ruby code is quite standard. When the endpoint is hit, a Request object is created and the params are passed into a Sidekiq worker.
The worker class overrides the perform_async method. The default queue used by Sidekiq is "queue:default", and in here we use "queue:elixir" in order to prevent the ruby worker to process the job. After building the json payload, we push it into the redis queue (lpush) and then it is time for Elixir to take over !

Elixir

defmodule FetchItWorkers.RedisPoller do  
  use GenServer

  def start_link(redis_client) do
    GenServer.start_link(__MODULE__, [redis_client], [])
  end

  def poll(redis_client) do
    job = Redix.command!(redis_client, ~w(RPOP queue:elixir))
    new_job(job, redis_client)

    :timer.sleep(100)
    poll(redis_client)
  end

  def new_job(nil, redis_client) do
    # No job to process ...
  end

  def new_job(job, redis_client) do
    :poolboy.transaction(
      :sidekiq_pool,
      fn(pid) -> GenServer.call(pid, {:run, job, redis_client}) end,
      :infinity
    )
  end

  def init([redis_client]) do
    # timeout to start polling
    {:ok, redis_client, 0}
  end

  def handle_info(:timeout, redis_client) do
    poll(redis_client)
  end
end  
defmodule FetchItWorkers.Sidekiq do  
  use GenServer

  def start_link(_args) do
    :random.seed(:os.timestamp)
    GenServer.start_link(__MODULE__, [], [])
  end

  def run(pid, job, redis_client) do
    GenServer.call(pid, {:run, job, redis_client})
  end

  def handle_call({:run, job, redis_client}, _from, state) do

    {:ok, job} = Poison.decode(job)
    jid = job["jid"]
    args = Enum.into(job["args"] |> hd, %{})
    queue = "queue:default"
    class = "TwitterWorker"
    enqueued_at = job["enqueued_at"]

    IO.puts "Handling Sidekiq Job (#{jid})"

    tweets = FetchItWorkers.TwitterClient.fetch_tweets(:twitter_worker, args["search_string"], args["number_of_tweets"])
    FetchItWorkers.Filestore.store_tweets(args["uuid"], tweets)

    new_job = %{
      queue: queue, jid: jid,  class: class,
      args: [jid, args["uuid"], args["search_string"],
      args["number_of_tweets"]], enqueued_at: enqueued_at
    } |> Poison.Encoder.encode([])

    Redix.command(redis_client, ~w(LPUSH #{queue} #{new_job}))

    {:reply, :ok, state}
  end
end  

The Poller GenServer receives a redis connection as argument (redis_client) and, upon initialisation, the poll function is invoked. This function will issue a redis command (RPOP) that will get the "oldest" job enqueued by the ruby app. to be processed.
Yet, what if there is no job ? Well, that is exactly why the new_job functions pattern matches on two cases:

  • new_job(nil, redis_client)
  • new_job(job, redis_client)

If no job is pending, nothing will happen. However, if there is a job waiting to be processed, we get a worker from the configured pool and the tweets are fetched and saved in a file.

After that, the pool function sleeps for 100 ms and invokes itself.

Let's now look into the Sidekiq module that forms the worker pool. I'll focus on the handle_call function, since the rest of the module is quite standard.

The first argument of this function is the tuple {:run, job, redis_client}

job is the json payload retrieved from redis, and redis_client is the pid of the redis connection.

The json is parsed and the queue is defined as "queue:default"
Wait! Isn't this the default queue used by Sidekiq? Indeed it is. So, after performing the fetch and save operations, we publish the job again. This time on the queue that Sidekiq knows... This "trick" allows us to actually perform background jobs, first in Elixir and then in Ruby. Imagine you wished to save the result in a DB using ruby... Easy! You can just modify the ruby class TwitterWorker perform method, as it will pick up the payload we publish with the Elixir worker.


RPC

The previous examples were async operations. What if we want to block while we wait for a response from our Elixir app ?
Well, I've been wanting to experiment with RPC for a while, so why not? While at it, I was also able to use MessagePack!

Ruby

post "/rpc/twitter" do  
  params = JSON.parse(request.body.read)
  search_string = params["search"]
  number_of_tweets = params["number"]

  service = BERTRPC::Service.new('localhost', 10001)
  #:'Elixir.FetchItWorkers.RPC'
  response  = service.call.send(:'Elixir.FetchItWorkers.RPC').fetch_tweets([search_string, number_of_tweets])
  tweets = MessagePack.unpack(response)

  json tweets.map { |t| JSON.parse(t) }
end  

On the ruby app, we start by parsing the parameters; we then initialise the RPC service. In this example I am running it in localhost on port 10001.
If you take a close look at the code you may find this line a bit strange:

service.call.send(:'Elixir.FetchItWorkers.RPC').fetch_tweets([search_string, number_of_tweets])  

Normally, the rpc service call is made by using service.call.<module>.<function>([<arguments>])

That works well if the RPC service is implemented in Erlang and the module name is actually a symbol. As you may be aware, the Elixir modules are defined by using upper case and they are converted to symbols, in this case :'Elixir.FetchItWorkers.RPC'
This is the reason I am "sending" this!

After sending the response and the execution blocks, and upon receiving a response from the Elixir app, it gets unpacked and it is sent back to the client.

Elixir

defmodule FetchItWorkers.RPC do

  def fetch_tweets(message) when is_list message do
    IO.puts "Handling RPC Request ..."

    [search_string, number_of_tweets] = message

    FetchItWorkers.TwitterClient.fetch_tweets(:twitter_worker, search_string, number_of_tweets)
    |> Enum.map(&(List.to_string(&1)))
    |> MessagePack.pack!
  end
end  

The Elixir code for the RPC service is actually the simplest of all the 3 examples!

Apart from the RPC server initialisation and registration of the handler module when the Elixir application boots ...

(...)
number_acceptors = 100  
    port = 10001
    handlers = [FetchItWorkers.RPC]
    {:ok, _pid} = :aberth.start_server(number_acceptors, port, handlers)
(...)

... there is not much to say, as the handler module just calls the worker to fetch the tweets, maps the response and sends it back encoded as MessagePack...

I had real fun exploring this 3 approaches. In production, I would probably go with some more battle tested solution like RabbitMQ. With it, you can even emulate the sync behaviour by using RPC.

There are also a few libraries in the Elixir world you can take a look into, for example toniq and exq. The latter is even compatible with Resque and Sidekiq.



GO TO OUR CAREERS PAGE

Author image
Author, blogger, father and Ruby/Rails/Elixir guru, there's very little Paulo doesn't do. And best of all, he does it with a smile and (not so funny) jokes to boot!
top