Skip to content
/ job_queue Public

Thread-based ruby Job Queue with somewhat tight coupling to Rails

Notifications You must be signed in to change notification settings

cpm/job_queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

This is a simple job queue based on ruby's green threads that is relatively tightly coupled with Rails.

This has been used in production for a significant amount of time, but it's well packaged for people to use in their applications quite yet. It should be repackaged as a gem and/or plugin. 

Right now I just wanted to put the code up as motivation to do the right thing later on. I've been sitting on it for far too long.

Here's the gist:

Suppose you need your webapp to do something that might take an arbitrary amount of time. For example, processing credit cards. Regardless of whether you're using mongrel, passenger, or something else altogether, application processes are precious and your application will perform better if you return a response to the browser quickly.

That's where job queues come in. You have an outside process that is unaffiliated with your Rails/Rack processes. Your webapp tells the job queue to do the work, the queue gives it an ID so it can check on the progress later. 

Meanwhile, your app returns with one of those nifty AJAX spinners or a "Please wait, refreshing" page. It refreshes periodically checking on that ID.

When the queue is done working, the app gets the results from the queue and displays to the end user.

job_queue_daemonizer.rb lives in /script/. Really should be replaced with a Rake task. You run this to fire up the server.

You fill /lib/job_workers with subclasses of JobWorker. What it returns gets sent back to the controller.

Here's some example code of how you might use this in your controller:

class MyController < ApplicationController

  def index

    jobber(
      ExampleWorker
      :params => { :passed_to => "The Worker" },
      :done => lambda {
        render :text => @processed.inspect
      }
    )
  end

  private
  def jobber(klass, jargs={})
    jargs[:params] ||= {}
    jargs[:done] ||= lambda {}
    jargs[:processing] ||= lambda do |next_url|
      flash.keep
      headers["Refresh"] = "1; URL=#{next_url}"
      @next_url = next_url
      render :template => "application/processing"
    end

    the_url = url_for(params)
    if params[:job_id].blank?
      jargs[:params] = jargs[:params].call if jargs[:params].respond_to?(:call)
      return if performed? # if we rendered something, quit!

      dumpee = {:worker => klass, :args => jargs[:params] }
      serialized_data = Marshal.dump( dumpee )

      TCPSocket.open('localhost', JobQueue::Config.port) do |sock|
        sock.print("QUEUE #{serialized_data.size}\n\n#{serialized_data}")
        params[:job_id] = sock.gets("\n\n").to_i
        the_url = url_for(params.merge(:job_id => params[:job_id], :n => (params[:n].to_i || 0)+1))
      end

      jargs[:processing].call(the_url)
      return
    end

    TCPSocket.open('localhost', JobQueue::Config.port) do |sock|
      sock.print("CHECK #{params[:job_id]}\n\n")
      resp = sock.gets("\n\n")
      case resp
      when /\ANOTFOUND/
        sock.close
        render :template => "application/job_queue_not_found"
      when /\AWORKING/
        jargs[:processing].call(the_url)
        return
      when /\ADONE (\d+)\n\n/
        serialized_data = sock.read($1.to_i)
        @processed = Marshal.load(serialized_data)
        raise @processed if @processed.kind_of?(Exception)
        @processed.each { |res| raise res if res.kind_of?(Exception) } if @processed.respond_to?(:each)

        jargs[:done].call
        return
      end
    end
  end
end


WARNINGS:

If you use this approach, be aware that there are some things to keep in mind. This server uses whatever threading the ruby implementation provides. 

In MRI, you're talking about green threads. These are fine for ruby IO. However, if you're using C extensions to do something like resize massive images, the server may become unresponsive. That's because ruby's scheduler doens't have a chance to schedule threads while running a C extension.

If you're in JRuby land, you're using real OS threads. Since this implementation uses one job request = one thread, I imagine it would be trivial to overload the scheduler.

That being said, if your bottleneck is ruby-land IO and you're using MRI, there shouldn't be any problems with this approach. 


TODO: 
Packaging. Documentation. License?

About

Thread-based ruby Job Queue with somewhat tight coupling to Rails

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages