ruby / workos
WorkOS provides SSO and AuthKit for OAuth flows.
I use a thin HTTP client for the four WorkOS endpoints I need. The bulk of the auth security work (JWT verification) happens through the jwt gem in my own code.
Client
Workos::Client:
require "http"
require "json"
require "securerandom"
require "uri"
module Workos
class Client
API_BASE = "https://api.workos.com"
API_KEY = ENV.fetch("WORKOS_API_KEY")
CLIENT_ID = ENV.fetch("WORKOS_CLIENT_ID")
TIMEOUT = 5
def authorization_url(organization:, redirect_uri:, state:)
query = URI.encode_www_form(
client_id: CLIENT_ID,
organization: organization,
redirect_uri: redirect_uri,
response_type: "code",
state: state
)
"#{API_BASE}/sso/authorize?#{query}"
end
def profile_and_token(code:)
resp = HTTP.timeout(TIMEOUT).post("#{API_BASE}/sso/token", form: {
client_id: CLIENT_ID,
client_secret: API_KEY,
code: code,
grant_type: "authorization_code"
})
if resp.code / 100 != 2
return [nil, resp.status.to_s]
end
[JSON.parse(resp.body.to_s), nil]
end
def audit_event(organization_id:, event:)
resp = HTTP
.auth("Bearer #{API_KEY}")
.headers("Idempotency-Key" => SecureRandom.uuid)
.timeout(TIMEOUT)
.post("#{API_BASE}/audit_logs/events", json: {
organization_id: organization_id,
event: event
})
if resp.code / 100 != 2
return [nil, resp.status.to_s]
end
[nil, nil]
end
def complete_authkit_flow(external_auth_id:, user:)
resp = HTTP
.auth("Bearer #{API_KEY}")
.timeout(TIMEOUT)
.post("#{API_BASE}/authkit/oauth2/complete", json: {
external_auth_id: external_auth_id,
user: user
})
if resp.code / 100 != 2
return [nil, resp.status.to_s]
end
body = JSON.parse(resp.body.to_s)
[body["redirect_uri"], nil]
end
end
end
The methods return [data, err] tuples. Production code adds
Sentry logging on non-2xx responses and rescues for transient
network exceptions; I've trimmed those here for clarity.
There are no retry loops. All calls happen inside the user's
login request cycle. Auth codes are single-use, so retrying
profile_and_token after a partial failure would compound the
problem rather than recover. Audit logging is best-effort.
Login
The login handler builds an authorization URL and stashes a CSRF state token in the session:
class Login < PublicHandler
def handle
state = SecureRandom.hex(16)
@session[:oauth_state] = state
workos_auth_url = Workos::Client.new.authorization_url(
organization: ENV.fetch("WORKOS_ORGANIZATION"),
redirect_uri: "#{base_url}/sso",
state: state
)
page "sso/login", workos_auth_url: workos_auth_url
end
end
authorization_url builds a string. No HTTP request.
Callback
The callback verifies state, exchanges the code for a profile, and signs the user in:
class Callback < PublicHandler
def handle
state = @session.delete(:oauth_state)
if params["state"].nil? || params["state"] != state
flash_next(:error, "Forbidden")
return redirect_to("/login")
end
if params["code"].nil?
flash_next(:error, "Forbidden")
return redirect_to("/login")
end
client = Workos::Client.new
token, err = client.profile_and_token(code: params["code"])
if err
flash_next(:error, "SSO error. Try again.")
return redirect_to("/login")
end
profile = token["profile"] || {}
user = Users::ByEmail.new(db).call(profile["email"])
if user == {}
flash_next(:error, "Forbidden")
return redirect_to("/login")
end
remember user["remember_token"]
redirect_to(safe_return_to)
end
end
Users::ByEmail queries WHERE active = true, so deactivated
users can't log in.
Audit logs
After a successful login, write a WorkOS audit event. The
Idempotency-Key header is required by the audit logs endpoint,
so retries (here or upstream) are safe:
client.audit_event(
organization_id: ENV.fetch("WORKOS_ORGANIZATION"),
event: {
action: "user_logged_in",
occurred_at: Time.now.utc.iso8601,
actor: {id: user["id"].to_s, type: "user", name: user["name"]},
targets: [{id: user["id"].to_s, type: "user"}],
context: {location: @req.ip, user_agent: @req.user_agent}
}
)
The handler ignores the returned error. A missing audit log shouldn't block a login.
AuthKit bridge
The same Login and Callback handlers also serve an
MCP AuthKit flow. When a client starts OAuth with
AuthKit, AuthKit redirects to GET /login?external_auth_id=xxx.
If the user is signed in, stash the value in the session and render a consent page:
class Login < PublicHandler
def handle
external_auth_id = params["external_auth_id"].to_s.strip
valid = external_auth_id != "" &&
external_auth_id.match?(/\A[a-zA-Z0-9_-]{1,255}\z/)
if current_user["id"] && valid
@session[:external_auth_id] = external_auth_id
return page "sso/authkit_confirm", title: "Connect MCP"
end
# ... otherwise, normal SSO flow
end
end
A POST from the consent page completes the bind:
class Confirm < Handler
include AuthKitCompletion
def handle
external_auth_id = @session.delete(:external_auth_id).to_s
if external_auth_id == ""
flash_next(:error, "Session expired. Try again.")
return redirect_to("/login")
end
complete_authkit_flow(external_auth_id, {
id: current_user["id"].to_s,
email: current_user["email"]
})
end
end
Without the consent step, an attacker could email a logged-in
victim a link with ?external_auth_id=ATTACKER_ID and silently
bind their AuthKit identity to the victim's account on click.
A CSRF-protected POST blocks that.
For guests, the callback path stashes external_auth_id in the
session and completes AuthKit there instead of ending the
request with a logged-in session. State already protects that
path.
The completion call returns an AuthKit-issued redirect URL. Check the host before redirecting:
def complete_authkit_flow(external_auth_id, user_payload)
redirect_uri, err = Workos::Client.new.complete_authkit_flow(
external_auth_id: external_auth_id,
user: user_payload
)
if err
flash_next(:error, "MCP authentication error. Try again.")
return redirect_to("/login")
end
uri = URI.parse(redirect_uri.to_s)
if uri.scheme != "https" || !uri.host&.end_with?(".workos.com", ".authkit.app")
flash_next(:error, "MCP authentication error. Try again.")
return redirect_to("/login")
end
[303, {"Location" => redirect_uri}, []]
end
The host check is defense in depth against open redirects.
Tests
Tests run on a custom framework. Stub the HTTP boundary with webmock:
stub_request(:post, "https://api.workos.com/sso/token")
.to_return(
status: 200,
body: JSON.generate({
profile: {email: "[email protected]", first_name: "U", last_name: "X"}
})
)