ruby / db

I wrap the pg Postgres driver with light methods instead of using an ORM.

Configuration

Add the pg gem:

gem "connection_pool"
gem "pg"

Configure the pool on process boot:

# config/puma.rb
on_worker_boot do
  DB.configure do |c|
    c.pool_size = workers * threads
    c.reap = true
  end
end

Wrapper

The DB class manages connection pooling and provides a simple interface:

# lib/db.rb
require "connection_pool"
require "pg"

class DB
  Config = Struct.new(:pool_size, :reap)

  class << self
    def configure
      @config = Config.new
      yield(@config)
    end

    def pool
      @pool ||= new(
        pool_size: @config.pool_size,
        reap: @config.reap
      )
    end
  end

  def initialize(pool_size: 1, reap: false)
    @pool = ConnectionPool.new(size: pool_size, timeout: 5) { build_pg_conn }

    if reap
      start_reaper_thread
    end
  end

  def exec(sql, params = [])
    @pool.with do |conn|
      iconn = InstrumentedConnection.new(conn, @env)
      iconn.exec(sql, params)
    end
  end

  def transaction
    @pool.with do |conn|
      iconn = InstrumentedConnection.new(conn, @env)
      iconn.transaction { yield(iconn) }
    end
  end

  def fuzzy_like(string)
    pattern = Regexp.union("%", "_")
    escape_char = "\\"
    escaped = string
      .gsub(pattern) { |x| [escape_char, x].join }
      .tr(" ", "%")

    "%#{escaped}%"
  end

  private def build_pg_conn
    conn = PG.connect(db_url_for(@env))

    map = PG::BasicTypeMapForResults.new(conn)
    map.default_type_map = PG::TypeMapAllStrings.new

    conn.type_map_for_results = map
    conn.type_map_for_queries = PG::BasicTypeMapForQueries.new(conn)

    conn
  end

  private def start_reaper_thread
    Thread.new do
      Thread.current.name = "db-reaper"

      loop do
        @pool.reap(300) { |conn| conn&.close }
        sleep 60
      end
    end
  end
end

Each connection is wrapped with instrumentation for observability:

# lib/instrumented_connection.rb
require "forwardable"
require "pg"
require "sentry-ruby"

class InstrumentedConnection
  extend Forwardable

  def_delegators :@conn, *(
    PG::Connection.public_instance_methods(false) - [
      :exec,
      :exec_params,
      :transaction
    ]
  )

  def initialize(conn, env)
    @conn = conn
    @env = env
  end

  def exec(sql, params = [])
    rows = []

    with_sentry(sql, params) do
      result = execute_query(sql, params)
      rows = result.to_a
    end

    rows
  rescue PG::ConnectionBad
    sleep 5 # give HA backup time to come online
    retry
  rescue => err
    if ["development", "test"].include?(@env)
      raise err
    else
      Sentry.capture_exception(err)
      []
    end
  end

  def transaction
    @conn.transaction { yield(self) }
  rescue PG::ConnectionBad
    sleep 5
    retry
  end

  private def execute_query(sql, params)
    if params.empty?
      @conn.exec(sql)
    else
      params = params.map { |p| p.is_a?(Hash) ? p.to_json : p }
      @conn.exec_params(sql, params)
    end
  end

  private def with_sentry(sql, params = [])
    tx = Sentry.get_current_scope&.get_span ||
      Sentry.get_current_scope&.get_transaction ||
      Sentry.start_transaction(name: "DB#exec")

    if tx
      tx.with_child_span(op: "db.sql.execute", description: sql) do |span|
        span.set_data("SQL Params", params)
        yield
      end
    else
      yield
    end
  end
end

Usage

In controllers, access the pool:

class ApplicationController < ActionController::Base
  private def db
    DB.pool
  end
end

class SuggestionsController < ApplicationController
  def new
    render json: Search::SuggestCompany.new(db).call(
      query: params[:query]
    )
  end
end

Pass db through initializers and use <<~SQL heredocs:

module Search
  class SuggestCompany
    def initialize(db)
      @db = db
    end

    def call(query:)
      @db.exec(<<~SQL, [@db.fuzzy_like(query)])
        SELECT
          companies.id,
          companies.name,
          companies.status
        FROM
          companies
        WHERE
          companies.name ILIKE $1
          OR companies.also_known_as ILIKE $1
        ORDER BY
          companies.score DESC
        LIMIT 50
      SQL
    end
  end
end

For scripts, instantiate directly:

if $0 == __FILE__
  require_relative "../db"
  pp Search::SuggestCompany.new(DB.new).call(query: "Data")
end

Background processes

Initialize DB connections in background processes:

# Each forked worker gets its own connection
children = workers.map do |worker|
  fork { worker.new(DB.new).poll }
end

See ruby / job queues and ruby / clock for complete examples.

Benefits

← All articles