Postgres-backed job queues with Ruby

A few lines of Ruby with the pg driver is a simple alternative to a job queuing library. Job queues are defined as database tables and workers are defined in one Ruby file.

Depending on the queue requirements, either polling or Postgres' LISTEN/NOTIFY may be appropriate.

queue_poll: bundle exec ruby queue_poll.rb
queue_listen: bundle exec ruby queue_listen.rb

To run one worker on Heroku:

heroku ps:scale queue_poll=1

Or:

heroku ps:scale queue_listen=1

Queues can contain heterogeneous job types. But, if you want to avoid backup in one queue affecting jobs of another type, create N numbers of (1 job queue table + 1 job worker).

Poll

With a job_queue table...

CREATE TABLE job_queue (
  id SERIAL,
  created_at timestamp DEFAULT now() NOT NULL,
  status text DEFAULT 'pending'::text NOT NULL,
  name text NOT NULL,
  data jsonb NOT NULL,
  worked_at timestamp
);

...the job worker could look like:

require "pg"
require "json"

require_relative "job_one"
require_relative "job_two"

begin
  conn = PG.connect(ENV.fetch("DATABASE_URL"))

  interval = 10
  puts "Polling every #{interval} seconds..."

  loop do
    sleep interval

    t = Process.clock_gettime(Process::CLOCK_MONOTONIC)

    job = conn.exec(<<~SQL).first
      SELECT
        id,
        name,
        data
      FROM
        job_queue
      WHERE
        worked_at IS NULL
      ORDER BY
        created_at ASC
      LIMIT 1
    SQL
    if !job
      next
    end

    status = case job["name"]
    when "JobOne"
      JobOne.call(JSON.parse(job["data"]))
    when "JobTwo"
      JobTwo.call(JSON.parse(job["data"]))
    else
      "err: invalid job #{job["name"]}"
    end

    conn.exec_params(<<~SQL, [status, job["id"]])
      UPDATE
        job_queue
      SET
        status = $1,
        worked_at = now()
      WHERE
        id = $2
    SQL

    elapsed = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t).round(2)
    puts "#{elapsed}s job #{job["id"]}: #{status}"
  end
ensure
  conn&.close
end

Listen

With a job_queue table, NOTIFY function, and TRIGGER...

CREATE TABLE job_queue (
  id SERIAL,
  created_at timestamp DEFAULT now() NOT NULL,
  status text DEFAULT 'pending'::text NOT NULL,
  name text NOT NULL,
  data jsonb NOT NULL,
  worked_at timestamp
);

CREATE FUNCTION notify_job_queued() RETURNS TRIGGER AS $$
BEGIN
  PERFORM
    pg_notify('job_queued', cast(NEW.id AS varchar));
  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER on_job_queue
  AFTER INSERT ON job_queue
  FOR EACH ROW
  EXECUTE PROCEDURE notify_job_queued();

...the job worker could look like:

require "pg"
require "json"

require_relative "job_one"
require_relative "job_two"

begin
  conn = PG.connect(ENV.fetch("DATABASE_URL"))

  conn.exec "LISTEN job_queued"
  puts "Waiting on job_queued channel..."

  loop do
    conn.wait_for_notify do |event, pid, job_id|
      job = conn.exec_params(<<~SQL, [job_id]).first
        SELECT
          id,
          name,
          data
        FROM
          job_queue
        WHERE
          id = $1
      SQL
      if !job
        next
      end

      t = Process.clock_gettime(Process::CLOCK_MONOTONIC)

      status = case job["name"]
      when "JobOne"
        JobOne.call(JSON.parse(job["data"]))
      when "JobTwo"
        JobTwo.call(JSON.parse(job["data"]))
      else
        "err: invalid job #{job["name"]}"
      end

      conn.exec_params(<<~SQL, [status, job["id"]])
        UPDATE
          job_queue
        SET
          status = $1,
          worked_at = now()
        WHERE
          id = $2
      SQL

      elapsed = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t).round(2)
      puts "#{elapsed}s job #{job["id"]}: #{status}"
    end
  end
ensure
  conn&.exec "UNLISTEN job_queued"
  conn&.close
end

Enqueue

In either case, enqueue a job by INSERTing into the queue table.

require "pg"
require "json"

conn = PG.connect(ENV.fetch("DATABASE_URL"))

conn.exec_params(<<~SQL, [{company_id: 1}.to_json])
  INSERT INTO job_queue (name, data)
  VALUES ('JobOne', $1)
SQL

job = conn.exec(<<~SQL).first
  SELECT data
  FROM job_queue
SQL

puts JSON.parse(job["data"]).dig("company_id") # 1