ruby / workos

WorkOS provides SSO and AuthKit for OAuth flows.

I use a thin HTTP client for the four WorkOS endpoints I need. The bulk of the auth security work (JWT verification) happens through the jwt gem in my own code.

Client

Workos::Client:

require "http"
require "json"
require "securerandom"
require "uri"

module Workos
  class Client
    API_BASE = "https://api.workos.com"
    API_KEY = ENV.fetch("WORKOS_API_KEY")
    CLIENT_ID = ENV.fetch("WORKOS_CLIENT_ID")
    TIMEOUT = 5

    def authorization_url(organization:, redirect_uri:, state:)
      query = URI.encode_www_form(
        client_id: CLIENT_ID,
        organization: organization,
        redirect_uri: redirect_uri,
        response_type: "code",
        state: state
      )
      "#{API_BASE}/sso/authorize?#{query}"
    end

    def profile_and_token(code:)
      resp = HTTP.timeout(TIMEOUT).post("#{API_BASE}/sso/token", form: {
        client_id: CLIENT_ID,
        client_secret: API_KEY,
        code: code,
        grant_type: "authorization_code"
      })
      if resp.code / 100 != 2
        return [nil, resp.status.to_s]
      end

      [JSON.parse(resp.body.to_s), nil]
    end

    def audit_event(organization_id:, event:)
      resp = HTTP
        .auth("Bearer #{API_KEY}")
        .headers("Idempotency-Key" => SecureRandom.uuid)
        .timeout(TIMEOUT)
        .post("#{API_BASE}/audit_logs/events", json: {
          organization_id: organization_id,
          event: event
        })
      if resp.code / 100 != 2
        return [nil, resp.status.to_s]
      end

      [nil, nil]
    end

    def complete_authkit_flow(external_auth_id:, user:)
      resp = HTTP
        .auth("Bearer #{API_KEY}")
        .timeout(TIMEOUT)
        .post("#{API_BASE}/authkit/oauth2/complete", json: {
          external_auth_id: external_auth_id,
          user: user
        })
      if resp.code / 100 != 2
        return [nil, resp.status.to_s]
      end

      body = JSON.parse(resp.body.to_s)
      [body["redirect_uri"], nil]
    end
  end
end

The methods return [data, err] tuples. Production code adds Sentry logging on non-2xx responses and rescues for transient network exceptions; I've trimmed those here for clarity.

There are no retry loops. All calls happen inside the user's login request cycle. Auth codes are single-use, so retrying profile_and_token after a partial failure would compound the problem rather than recover. Audit logging is best-effort.

Login

The login handler builds an authorization URL and stashes a CSRF state token in the session:

class Login < PublicHandler
  def handle
    state = SecureRandom.hex(16)
    @session[:oauth_state] = state

    workos_auth_url = Workos::Client.new.authorization_url(
      organization: ENV.fetch("WORKOS_ORGANIZATION"),
      redirect_uri: "#{base_url}/sso",
      state: state
    )

    page "sso/login", workos_auth_url: workos_auth_url
  end
end

authorization_url builds a string. No HTTP request.

Callback

The callback verifies state, exchanges the code for a profile, and signs the user in:

class Callback < PublicHandler
  def handle
    state = @session.delete(:oauth_state)
    if params["state"].nil? || params["state"] != state
      flash_next(:error, "Forbidden")
      return redirect_to("/login")
    end
    if params["code"].nil?
      flash_next(:error, "Forbidden")
      return redirect_to("/login")
    end

    client = Workos::Client.new
    token, err = client.profile_and_token(code: params["code"])
    if err
      flash_next(:error, "SSO error. Try again.")
      return redirect_to("/login")
    end
    profile = token["profile"] || {}

    user = Users::ByEmail.new(db).call(profile["email"])
    if user == {}
      flash_next(:error, "Forbidden")
      return redirect_to("/login")
    end

    remember user["remember_token"]
    redirect_to(safe_return_to)
  end
end

Users::ByEmail queries WHERE active = true, so deactivated users can't log in.

Audit logs

After a successful login, write a WorkOS audit event. The Idempotency-Key header is required by the audit logs endpoint, so retries (here or upstream) are safe:

client.audit_event(
  organization_id: ENV.fetch("WORKOS_ORGANIZATION"),
  event: {
    action: "user_logged_in",
    occurred_at: Time.now.utc.iso8601,
    actor: {id: user["id"].to_s, type: "user", name: user["name"]},
    targets: [{id: user["id"].to_s, type: "user"}],
    context: {location: @req.ip, user_agent: @req.user_agent}
  }
)

The handler ignores the returned error. A missing audit log shouldn't block a login.

AuthKit bridge

The same Login and Callback handlers also serve an MCP AuthKit flow. When a client starts OAuth with AuthKit, AuthKit redirects to GET /login?external_auth_id=xxx.

If the user is signed in, stash the value in the session and render a consent page:

class Login < PublicHandler
  def handle
    external_auth_id = params["external_auth_id"].to_s.strip
    valid = external_auth_id != "" &&
      external_auth_id.match?(/\A[a-zA-Z0-9_-]{1,255}\z/)

    if current_user["id"] && valid
      @session[:external_auth_id] = external_auth_id
      return page "sso/authkit_confirm", title: "Connect MCP"
    end
    # ... otherwise, normal SSO flow
  end
end

A POST from the consent page completes the bind:

class Confirm < Handler
  include AuthKitCompletion

  def handle
    external_auth_id = @session.delete(:external_auth_id).to_s
    if external_auth_id == ""
      flash_next(:error, "Session expired. Try again.")
      return redirect_to("/login")
    end

    complete_authkit_flow(external_auth_id, {
      id: current_user["id"].to_s,
      email: current_user["email"]
    })
  end
end

Without the consent step, an attacker could email a logged-in victim a link with ?external_auth_id=ATTACKER_ID and silently bind their AuthKit identity to the victim's account on click. A CSRF-protected POST blocks that.

For guests, the callback path stashes external_auth_id in the session and completes AuthKit there instead of ending the request with a logged-in session. State already protects that path.

The completion call returns an AuthKit-issued redirect URL. Check the host before redirecting:

def complete_authkit_flow(external_auth_id, user_payload)
  redirect_uri, err = Workos::Client.new.complete_authkit_flow(
    external_auth_id: external_auth_id,
    user: user_payload
  )
  if err
    flash_next(:error, "MCP authentication error. Try again.")
    return redirect_to("/login")
  end

  uri = URI.parse(redirect_uri.to_s)
  if uri.scheme != "https" || !uri.host&.end_with?(".workos.com", ".authkit.app")
    flash_next(:error, "MCP authentication error. Try again.")
    return redirect_to("/login")
  end

  [303, {"Location" => redirect_uri}, []]
end

The host check is defense in depth against open redirects.

Tests

Tests run on a custom framework. Stub the HTTP boundary with webmock:

stub_request(:post, "https://api.workos.com/sso/token")
  .to_return(
    status: 200,
    body: JSON.generate({
      profile: {email: "[email protected]", first_name: "U", last_name: "X"}
    })
  )

← All articles