[rubygems/rubygems] Add WebauthnListener class

d42ddbb73c

Co-authored-by: Ashley Ellis Pierce <anellis12@gmail.com>
Co-authored-by: Jacques Chester <jacques.chester@shopify.com>
This commit is contained in:
Jenny Shen 2023-02-15 10:51:43 -05:00 committed by Hiroshi SHIBATA
parent 27322e51a7
commit 6e7bf0677d
2 changed files with 203 additions and 0 deletions

View file

@ -0,0 +1,83 @@
# frozen_string_literal: true
require_relative "webauthn_listener/response/response_ok"
require_relative "webauthn_listener/response/response_no_content"
require_relative "webauthn_listener/response/response_bad_request"
require_relative "webauthn_listener/response/response_not_found"
require_relative "webauthn_listener/response/response_method_not_allowed"
##
# The WebauthnListener class retrieves an OTP after a user successfully WebAuthns with the Gem host.
# An instance opens a socket using the TCPServer instance given and listens for a request from the Gem host.
# The request should be a GET request to the root path and contains the OTP code in the form
# of a query parameter `code`. The listener will return the code which will be used as the OTP for
# API requests.
#
# Types of responses sent by the listener after receiving a request:
# - 200 OK: OTP code was successfully retrieved
# - 204 No Content: If the request was an OPTIONS request
# - 400 Bad Request: If the request did not contain a query parameter `code`
# - 404 Not Found: The request was not to the root path
# - 405 Method Not Allowed: OTP code was not retrieved because the request was not a GET/OPTIONS request
#
# Example usage:
#
# server = TCPServer.new(0)
# otp = Gem::WebauthnListener.wait_for_otp_code("https://rubygems.example", server)
#
class Gem::WebauthnListener
attr_reader :host
def initialize(host)
@host = host
end
def self.wait_for_otp_code(host, server)
new(host).fetch_otp_from_connection(server)
end
def fetch_otp_from_connection(server)
loop do
socket = server.accept
request_line = socket.gets
method, req_uri, _protocol = request_line.split(" ")
req_uri = URI.parse(req_uri)
unless root_path?(req_uri)
ResponseNotFound.send(socket, host)
raise Gem::WebauthnVerificationError, "Page at #{req_uri.path} not found."
end
case method.upcase
when "OPTIONS"
ResponseNoContent.send(socket, host)
next # will be GET
when "GET"
if otp = parse_otp_from_uri(req_uri)
ResponseOk.send(socket, host)
return otp
end
ResponseBadRequest.send(socket, host)
raise Gem::WebauthnVerificationError, "Did not receive OTP from #{host}."
else
ResponseMethodNotAllowed.send(socket, host)
raise Gem::WebauthnVerificationError, "Invalid HTTP method #{method.upcase} received."
end
end
end
private
def root_path?(uri)
uri.path == "/"
end
def parse_otp_from_uri(uri)
require "cgi"
return if uri.query.nil?
CGI.parse(uri.query).dig("code", 0)
end
end

View file

@ -0,0 +1,120 @@
# frozen_string_literal: true
require_relative "helper"
require "rubygems/webauthn_listener"
class WebauthnListenerTest < Gem::TestCase
def setup
super
@server = TCPServer.new 0
@port = @server.addr[1].to_s
end
def test_wait_for_otp_code_get_follows_options
wait_for_otp_code
assert Gem::MockBrowser.options(URI("http://localhost:#{@port}?code=xyz")).is_a? Net::HTTPNoContent
assert Gem::MockBrowser.get(URI("http://localhost:#{@port}?code=xyz")).is_a? Net::HTTPOK
end
def test_wait_for_otp_code_options_request
wait_for_otp_code
response = Gem::MockBrowser.options URI("http://localhost:#{@port}?code=xyz")
assert response.is_a? Net::HTTPNoContent
assert_equal Gem.host, response["access-control-allow-origin"]
assert_equal "POST", response["access-control-allow-methods"]
assert_equal "Content-Type, Authorization, x-csrf-token", response["access-control-allow-headers"]
assert_equal "close", response["Connection"]
end
def test_wait_for_otp_code_get_request
wait_for_otp_code
response = Gem::MockBrowser.get URI("http://localhost:#{@port}?code=xyz")
assert response.is_a? Net::HTTPOK
assert_equal "text/plain", response["Content-Type"]
assert_equal "7", response["Content-Length"]
assert_equal Gem.host, response["access-control-allow-origin"]
assert_equal "POST", response["access-control-allow-methods"]
assert_equal "Content-Type, Authorization, x-csrf-token", response["access-control-allow-headers"]
assert_equal "close", response["Connection"]
assert_equal "success", response.body
@thread.join
assert_equal "xyz", @thread[:otp]
end
def test_wait_for_otp_code_invalid_post_req_method
wait_for_otp_code_expect_error_with_message("Security device verification failed: Invalid HTTP method POST received.")
response = Gem::MockBrowser.post URI("http://localhost:#{@port}?code=xyz")
assert response
assert response.is_a? Net::HTTPMethodNotAllowed
assert_equal "GET, OPTIONS", response["allow"]
assert_equal "close", response["Connection"]
@thread.join
assert_nil @thread[:otp]
end
def test_wait_for_otp_code_incorrect_path
wait_for_otp_code_expect_error_with_message("Security device verification failed: Page at /path not found.")
response = Gem::MockBrowser.post URI("http://localhost:#{@port}/path?code=xyz")
assert response.is_a? Net::HTTPNotFound
assert_equal "close", response["Connection"]
@thread.join
assert_nil @thread[:otp]
end
def test_wait_for_otp_code_no_params_response
wait_for_otp_code_expect_error_with_message("Security device verification failed: Did not receive OTP from https://rubygems.org.")
response = Gem::MockBrowser.get URI("http://localhost:#{@port}")
assert response.is_a? Net::HTTPBadRequest
assert_equal "text/plain", response["Content-Type"]
assert_equal "22", response["Content-Length"]
assert_equal "close", response["Connection"]
assert_equal "missing code parameter", response.body
@thread.join
assert_nil @thread[:otp]
end
def test_wait_for_otp_code_incorrect_params
wait_for_otp_code_expect_error_with_message("Security device verification failed: Did not receive OTP from https://rubygems.org.")
response = Gem::MockBrowser.get URI("http://localhost:#{@port}?param=xyz")
assert response.is_a? Net::HTTPBadRequest
assert_equal "text/plain", response["Content-Type"]
assert_equal "22", response["Content-Length"]
assert_equal "close", response["Connection"]
assert_equal "missing code parameter", response.body
@thread.join
assert_nil @thread[:otp]
end
private
def wait_for_otp_code
@thread = Thread.new do
Thread.current[:otp] = Gem::WebauthnListener.wait_for_otp_code(Gem.host, @server)
end
@thread.abort_on_exception = true
@thread.report_on_exception = false
end
def wait_for_otp_code_expect_error_with_message(message)
@thread = Thread.new do
error = assert_raise Gem::WebauthnVerificationError do
Thread.current[:otp] = Gem::WebauthnListener.wait_for_otp_code(Gem.host, @server)
end
assert_equal message, error.message
end
@thread.abort_on_exception = true
@thread.report_on_exception = false
end
end