require 'openid/server' require 'openid/cryptutil' require 'openid/association' require 'openid/util' require 'openid/message' require 'openid/store/memory' require 'openid/dh' require 'openid/consumer/associationmanager' require 'util' require "testutil" require 'test/unit' require 'uri' # In general, if you edit or add tests here, try to move in the # direction of testing smaller units. For testing the external # interfaces, we'll be developing an implementation-agnostic testing # suite. # for more, see /etc/ssh/moduli module OpenID ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7 ALT_GEN = 5 class CatchLogs def catchlogs_setup @old_logger = Util.logger Util.logger = self.method('got_log_message') @messages = [] end def got_log_message(message) @messages << message end def teardown Util.logger = @old_logger end end class TestProtocolError < Test::Unit::TestCase def test_browserWithReturnTo return_to = "http://rp.unittest/consumer" # will be a ProtocolError raised by Decode or # CheckIDRequest.answer args = Message.from_post_args({ 'openid.mode' => 'monkeydance', 'openid.identity' => 'http://wagu.unittest/', 'openid.return_to' => return_to, }) e = Server::ProtocolError.new(args, "plucky") assert(e.has_return_to) expected_args = { 'openid.mode' => 'error', 'openid.error' => 'plucky', } rt_base, result_args = e.encode_to_url.split('?', 2) result_args = Util.parse_query(result_args) assert_equal(result_args, expected_args) end def test_browserWithReturnTo_OpenID2_GET return_to = "http://rp.unittest/consumer" # will be a ProtocolError raised by Decode or # CheckIDRequest.answer args = Message.from_post_args({ 'openid.ns' => OPENID2_NS, 'openid.mode' => 'monkeydance', 'openid.identity' => 'http://wagu.unittest/', 'openid.claimed_id' => 'http://wagu.unittest/', 'openid.return_to' => return_to, }) e = Server::ProtocolError.new(args, "plucky") assert(e.has_return_to) expected_args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'error', 'openid.error' => 'plucky', } rt_base, result_args = e.encode_to_url.split('?', 2) result_args = Util.parse_query(result_args) assert_equal(result_args, expected_args) end def test_browserWithReturnTo_OpenID2_POST return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT) # will be a ProtocolError raised by Decode or # CheckIDRequest.answer args = Message.from_post_args({ 'openid.ns' => OPENID2_NS, 'openid.mode' => 'monkeydance', 'openid.identity' => 'http://wagu.unittest/', 'openid.claimed_id' => 'http://wagu.unittest/', 'openid.return_to' => return_to, }) e = Server::ProtocolError.new(args, "plucky") assert(e.has_return_to) expected_args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'error', 'openid.error' => 'plucky', } assert(e.which_encoding == Server::ENCODE_HTML_FORM) assert(e.to_form_markup == e.to_message.to_form_markup( args.get_arg(OPENID_NS, 'return_to'))) end def test_browserWithReturnTo_OpenID1_exceeds_limit return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT) # will be a ProtocolError raised by Decode or # CheckIDRequest.answer args = Message.from_post_args({ 'openid.mode' => 'monkeydance', 'openid.identity' => 'http://wagu.unittest/', 'openid.return_to' => return_to, }) e = Server::ProtocolError.new(args, "plucky") assert(e.has_return_to) expected_args = { 'openid.mode' => 'error', 'openid.error' => 'plucky', } assert(e.which_encoding == Server::ENCODE_URL) rt_base, result_args = e.encode_to_url.split('?', 2) result_args = Util.parse_query(result_args) assert_equal(result_args, expected_args) end def test_noReturnTo # will be a ProtocolError raised by Decode or # CheckIDRequest.answer args = Message.from_post_args({ 'openid.mode' => 'zebradance', 'openid.identity' => 'http://wagu.unittest/', }) e = Server::ProtocolError.new(args, "waffles") assert(!e.has_return_to) expected = "error:waffles\nmode:error\n" assert_equal(e.encode_to_kvform, expected) end def test_no_message e = Server::ProtocolError.new(nil, "no message") assert(e.get_return_to.nil?) end def test_which_encoding_no_message e = Server::ProtocolError.new(nil, "no message") assert(e.which_encoding.nil?) end end class TestDecode < Test::Unit::TestCase def setup @claimed_id = 'http://de.legating.de.coder.unittest/' @id_url = "http://decoder.am.unittest/" @rt_url = "http://rp.unittest/foobot/?qux=zam" @tr_url = "http://rp.unittest/" @assoc_handle = "{assoc}{handle}" @op_endpoint = 'http://endpoint.unittest/encode' @store = Store::Memory.new() @server = Server::Server.new(@store, @op_endpoint) @decode = Server::Decoder.new(@server).method('decode') end def test_none args = {} r = @decode.call(args) assert_equal(r, nil) end def test_irrelevant args = { 'pony' => 'spotted', 'sreg.mutant_power' => 'decaffinator', } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_bad args = { 'openid.mode' => 'twos-compliment', 'openid.pants' => 'zippered', } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_dictOfLists args = { 'openid.mode' => ['checkid_setup'], 'openid.identity' => @id_url, 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.trust_root' => @tr_url, } begin result = @decode.call(args) rescue ArgumentError => err assert(!err.to_s.index('values').nil?, err) else flunk("Expected ArgumentError, but got result #{result}") end end def test_checkidImmediate args = { 'openid.mode' => 'checkid_immediate', 'openid.identity' => @id_url, 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.trust_root' => @tr_url, # should be ignored 'openid.some.extension' => 'junk', } r = @decode.call(args) assert(r.is_a?(Server::CheckIDRequest)) assert_equal(r.mode, "checkid_immediate") assert_equal(r.immediate, true) assert_equal(r.identity, @id_url) assert_equal(r.trust_root, @tr_url) assert_equal(r.return_to, @rt_url) assert_equal(r.assoc_handle, @assoc_handle) end def test_checkidImmediate_constructor r = Server::CheckIDRequest.new(@id_url, @rt_url, nil, @rt_url, true, @assoc_handle) assert(r.mode == 'checkid_immediate') assert(r.immediate) end def test_checkid_missing_return_to_and_trust_root args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.claimed_id' => @id_url, 'openid.assoc_handle' => @assoc_handle, } assert_raise(Server::ProtocolError) { m = Message.from_post_args(args) Server::CheckIDRequest.from_message(m, @op_endpoint) } end def test_checkid_id_select args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'checkid_setup', 'openid.identity' => IDENTIFIER_SELECT, 'openid.claimed_id' => IDENTIFIER_SELECT, 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.realm' => @tr_url, } m = Message.from_post_args(args) req = Server::CheckIDRequest.from_message(m, @op_endpoint) assert(req.id_select) end def test_checkid_not_id_select args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'checkid_setup', 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.realm' => @tr_url, } id_args = [ {'openid.claimed_id' => IDENTIFIER_SELECT, 'openid.identity' => 'http://bogus.com/'}, {'openid.claimed_id' => 'http://bogus.com/', 'openid.identity' => 'http://bogus.com/'}, ] id_args.each { |id| m = Message.from_post_args(args.merge(id)) req = Server::CheckIDRequest.from_message(m, @op_endpoint) assert(!req.id_select) } end def test_checkidSetup args = { 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.trust_root' => @tr_url, } r = @decode.call(args) assert(r.is_a?(Server::CheckIDRequest)) assert_equal(r.mode, "checkid_setup") assert_equal(r.immediate, false) assert_equal(r.identity, @id_url) assert_equal(r.trust_root, @tr_url) assert_equal(r.return_to, @rt_url) end def test_checkidSetupOpenID2 args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.claimed_id' => @claimed_id, 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.realm' => @tr_url, } r = @decode.call(args) assert(r.is_a?(Server::CheckIDRequest)) assert_equal(r.mode, "checkid_setup") assert_equal(r.immediate, false) assert_equal(r.identity, @id_url) assert_equal(r.claimed_id, @claimed_id) assert_equal(r.trust_root, @tr_url) assert_equal(r.return_to, @rt_url) end def test_checkidSetupNoClaimedIDOpenID2 args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.realm' => @tr_url, } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_checkidSetupNoIdentityOpenID2 args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'checkid_setup', 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.realm' => @tr_url, } r = @decode.call(args) assert(r.is_a?(Server::CheckIDRequest)) assert_equal(r.mode, "checkid_setup") assert_equal(r.immediate, false) assert_equal(r.identity, nil) assert_equal(r.trust_root, @tr_url) assert_equal(r.return_to, @rt_url) end def test_checkidSetupNoReturnOpenID1 # Make sure an OpenID 1 request cannot be decoded if it lacks a # return_to. args = { 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.assoc_handle' => @assoc_handle, 'openid.trust_root' => @tr_url, } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_checkidSetupNoReturnOpenID2 # Make sure an OpenID 2 request with no return_to can be decoded, # and make sure a response to such a request raises # NoReturnToError. args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.claimed_id' => @id_url, 'openid.assoc_handle' => @assoc_handle, 'openid.realm' => @tr_url, } req = @decode.call(args) assert(req.is_a?(Server::CheckIDRequest)) assert_raise(Server::NoReturnToError) { req.answer(false) } assert_raise(Server::NoReturnToError) { req.encode_to_url('bogus') } assert_raise(Server::NoReturnToError) { req.cancel_url } end def test_checkidSetupRealmRequiredOpenID2 # Make sure that an OpenID 2 request which lacks return_to cannot # be decoded if it lacks a realm. Spec => This value # (openid.realm) MUST be sent if openid.return_to is omitted. args = { 'openid.ns' => OPENID2_NS, 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.assoc_handle' => @assoc_handle, } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_checkidSetupBadReturn args = { 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => 'not a url', } begin result = @decode.call(args) rescue Server::ProtocolError => err assert(err.openid_message) else flunk("Expected ProtocolError, instead returned with #{result}") end end def test_checkidSetupUntrustedReturn args = { 'openid.mode' => 'checkid_setup', 'openid.identity' => @id_url, 'openid.assoc_handle' => @assoc_handle, 'openid.return_to' => @rt_url, 'openid.trust_root' => 'http://not-the-return-place.unittest/', } begin result = @decode.call(args) rescue Server::UntrustedReturnURL => err assert(err.openid_message, err.to_s) else flunk("Expected UntrustedReturnURL, instead returned with #{result}") end end def test_checkidSetupUntrustedReturn_Constructor assert_raise(Server::UntrustedReturnURL) { Server::CheckIDRequest.new(@id_url, @rt_url, nil, 'http://not-the-return-place.unittest/', false, @assoc_handle) } end def test_checkidSetupMalformedReturnURL_Constructor assert_raise(Server::MalformedReturnURL) { Server::CheckIDRequest.new(@id_url, 'bogus://return.url', nil, 'http://trustroot.com/', false, @assoc_handle) } end def test_checkAuth args = { 'openid.mode' => 'check_authentication', 'openid.assoc_handle' => '{dumb}{handle}', 'openid.sig' => 'sigblob', 'openid.signed' => 'identity,return_to,response_nonce,mode', 'openid.identity' => 'signedval1', 'openid.return_to' => 'signedval2', 'openid.response_nonce' => 'signedval3', 'openid.baz' => 'unsigned', } r = @decode.call(args) assert(r.is_a?(Server::CheckAuthRequest)) assert_equal(r.mode, 'check_authentication') assert_equal(r.sig, 'sigblob') end def test_checkAuthMissingSignature args = { 'openid.mode' => 'check_authentication', 'openid.assoc_handle' => '{dumb}{handle}', 'openid.signed' => 'foo,bar,mode', 'openid.foo' => 'signedval1', 'openid.bar' => 'signedval2', 'openid.baz' => 'unsigned', } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_checkAuthAndInvalidate args = { 'openid.mode' => 'check_authentication', 'openid.assoc_handle' => '{dumb}{handle}', 'openid.invalidate_handle' => '[[SMART_handle]]', 'openid.sig' => 'sigblob', 'openid.signed' => 'identity,return_to,response_nonce,mode', 'openid.identity' => 'signedval1', 'openid.return_to' => 'signedval2', 'openid.response_nonce' => 'signedval3', 'openid.baz' => 'unsigned', } r = @decode.call(args) assert(r.is_a?(Server::CheckAuthRequest)) assert_equal(r.invalidate_handle, '[[SMART_handle]]') end def test_associateDH args = { 'openid.mode' => 'associate', 'openid.session_type' => 'DH-SHA1', 'openid.dh_consumer_public' => "Rzup9265tw==", } r = @decode.call(args) assert(r.is_a?(Server::AssociateRequest)) assert_equal(r.mode, "associate") assert_equal(r.session.session_type, "DH-SHA1") assert_equal(r.assoc_type, "HMAC-SHA1") assert(r.session.consumer_pubkey) end def test_associateDHMissingKey # Trying DH assoc w/o public key args = { 'openid.mode' => 'associate', 'openid.session_type' => 'DH-SHA1', } # Using DH-SHA1 without supplying dh_consumer_public is an error. assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_associateDHpubKeyNotB64 args = { 'openid.mode' => 'associate', 'openid.session_type' => 'DH-SHA1', 'openid.dh_consumer_public' => "donkeydonkeydonkey", } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_associateDHModGen # test dh with non-default but valid values for dh_modulus and # dh_gen args = { 'openid.mode' => 'associate', 'openid.session_type' => 'DH-SHA1', 'openid.dh_consumer_public' => "Rzup9265tw==", 'openid.dh_modulus' => CryptUtil.num_to_base64(ALT_MODULUS), 'openid.dh_gen' => CryptUtil.num_to_base64(ALT_GEN) , } r = @decode.call(args) assert(r.is_a?(Server::AssociateRequest)) assert_equal(r.mode, "associate") assert_equal(r.session.session_type, "DH-SHA1") assert_equal(r.assoc_type, "HMAC-SHA1") assert_equal(r.session.dh.modulus, ALT_MODULUS) assert_equal(r.session.dh.generator, ALT_GEN) assert(r.session.consumer_pubkey) end def test_associateDHCorruptModGen # test dh with non-default but valid values for dh_modulus and # dh_gen args = { 'openid.mode' => 'associate', 'openid.session_type' => 'DH-SHA1', 'openid.dh_consumer_public' => "Rzup9265tw==", 'openid.dh_modulus' => 'pizza', 'openid.dh_gen' => 'gnocchi', } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_associateDHMissingGen args = { 'openid.mode' => 'associate', 'openid.session_type' => 'DH-SHA1', 'openid.dh_consumer_public' => "Rzup9265tw==", 'openid.dh_modulus' => 'pizza', } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_associateDHMissingMod args = { 'openid.mode' => 'associate', 'openid.session_type' => 'DH-SHA1', 'openid.dh_consumer_public' => "Rzup9265tw==", 'openid.dh_gen' => 'pizza', } assert_raise(Server::ProtocolError) { @decode.call(args) } end # def test_associateDHInvalidModGen(self): # # test dh with properly encoded values that are not a valid # # modulus/generator combination. # args = { # 'openid.mode': 'associate', # 'openid.session_type': 'DH-SHA1', # 'openid.dh_consumer_public': "Rzup9265tw==", # 'openid.dh_modulus': cryptutil.longToBase64(9), # 'openid.dh_gen': cryptutil.longToBase64(27) , # } # self.failUnlessRaises(server.ProtocolError, self.decode, args) # test_associateDHInvalidModGen.todo = "low-priority feature" def test_associateWeirdSession args = { 'openid.mode' => 'associate', 'openid.session_type' => 'FLCL6', 'openid.dh_consumer_public' => "YQ==\n", } assert_raise(Server::ProtocolError) { @decode.call(args) } end def test_associatePlain args = { 'openid.mode' => 'associate', } r = @decode.call(args) assert(r.is_a?(Server::AssociateRequest)) assert_equal(r.mode, "associate") assert_equal(r.session.session_type, "no-encryption") assert_equal(r.assoc_type, "HMAC-SHA1") end def test_nomode args = { 'openid.session_type' => 'DH-SHA1', 'openid.dh_consumer_public' => "my public keeey", } assert_raise(Server::ProtocolError) { @decode.call(args) } end end class BogusEncoder < Server::Encoder def encode(response) return "BOGUS" end end class BogusDecoder < Server::Decoder def decode(query) return "BOGUS" end end class TestEncode < Test::Unit::TestCase def setup @encoder = Server::Encoder.new @encode = @encoder.method('encode') @op_endpoint = 'http://endpoint.unittest/encode' @store = Store::Memory.new @server = Server::Server.new(@store, @op_endpoint) end def test_id_res_OpenID2_GET # Check that when an OpenID 2 response does not exceed the OpenID # 1 message size, a GET response (i.e., redirect) is issued. request = Server::CheckIDRequest.new( 'http://bombom.unittest/', 'http://burr.unittest/999', @server.op_endpoint, 'http://burr.unittest/', false, nil) response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'ns' => OPENID2_NS, 'mode' => 'id_res', 'identity' => request.identity, 'claimed_id' => request.identity, 'return_to' => request.return_to, }) assert(!response.render_as_form) assert(response.which_encoding == Server::ENCODE_URL) webresponse = @encode.call(response) assert(webresponse.headers.member?('location')) end def test_id_res_OpenID2_POST # Check that when an OpenID 2 response exceeds the OpenID 1 # message size, a POST response (i.e., an HTML form) is returned. request = Server::CheckIDRequest.new( 'http://bombom.unittest/', 'http://burr.unittest/999', @server.op_endpoint, 'http://burr.unittest/', false, nil) response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'ns' => OPENID2_NS, 'mode' => 'id_res', 'identity' => request.identity, 'claimed_id' => request.identity, 'return_to' => 'x' * OPENID1_URL_LIMIT, }) assert(response.render_as_form) assert(response.encode_to_url.length > OPENID1_URL_LIMIT) assert(response.which_encoding == Server::ENCODE_HTML_FORM) webresponse = @encode.call(response) assert_equal(webresponse.body, response.to_form_markup) end def test_id_res_OpenID1_exceeds_limit # Check that when an OpenID 1 response exceeds the OpenID 1 # message size, a GET response is issued. Technically, this # shouldn't be permitted by the library, but this test is in place # to preserve the status quo for OpenID 1. request = Server::CheckIDRequest.new( 'http://bombom.unittest/', 'http://burr.unittest/999', @server.op_endpoint, 'http://burr.unittest/', false, nil) response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'mode' => 'id_res', 'identity' => request.identity, 'return_to' => 'x' * OPENID1_URL_LIMIT, }) assert(!response.render_as_form) assert(response.encode_to_url.length > OPENID1_URL_LIMIT) assert(response.which_encoding == Server::ENCODE_URL) webresponse = @encode.call(response) assert_equal(webresponse.headers['location'], response.encode_to_url) end def test_id_res request = Server::CheckIDRequest.new( 'http://bombom.unittest/', 'http://burr.unittest/999', @server.op_endpoint, 'http://burr.unittest/', false, nil) response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'mode' => 'id_res', 'identity' => request.identity, 'return_to' => request.return_to, }) webresponse = @encode.call(response) assert_equal(webresponse.code, Server::HTTP_REDIRECT) assert(webresponse.headers.member?('location')) location = webresponse.headers['location'] assert(location.starts_with?(request.return_to), sprintf("%s does not start with %s", location, request.return_to)) # argh. q2 = Util.parse_query(URI::parse(location).query) expected = response.fields.to_post_args assert_equal(q2, expected) end def test_cancel request = Server::CheckIDRequest.new( 'http://bombom.unittest/', 'http://burr.unittest/999', @server.op_endpoint, 'http://burr.unittest/', false, nil) response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'mode' => 'cancel', }) webresponse = @encode.call(response) assert_equal(webresponse.code, Server::HTTP_REDIRECT) assert(webresponse.headers.member?('location')) end def test_assocReply msg = Message.new(OPENID2_NS) msg.set_arg(OPENID2_NS, 'session_type', 'no-encryption') request = Server::AssociateRequest.from_message(msg) response = Server::OpenIDResponse.new(request) response.fields = Message.from_post_args( {'openid.assoc_handle' => "every-zig"}) webresponse = @encode.call(response) body = "assoc_handle:every-zig\n" assert_equal(webresponse.code, Server::HTTP_OK) assert_equal(webresponse.headers, {}) assert_equal(webresponse.body, body) end def test_checkauthReply request = Server::CheckAuthRequest.new('a_sock_monkey', 'siggggg', []) response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'is_valid' => 'true', 'invalidate_handle' => 'xXxX:xXXx' }) body = "invalidate_handle:xXxX:xXXx\nis_valid:true\n" webresponse = @encode.call(response) assert_equal(webresponse.code, Server::HTTP_OK) assert_equal(webresponse.headers, {}) assert_equal(webresponse.body, body) end def test_unencodableError args = Message.from_post_args({ 'openid.identity' => 'http://limu.unittest/', }) e = Server::ProtocolError.new(args, "wet paint") assert_raise(Server::EncodingError) { @encode.call(e) } end def test_encodableError args = Message.from_post_args({ 'openid.mode' => 'associate', 'openid.identity' => 'http://limu.unittest/', }) body="error:snoot\nmode:error\n" webresponse = @encode.call(Server::ProtocolError.new(args, "snoot")) assert_equal(webresponse.code, Server::HTTP_ERROR) assert_equal(webresponse.headers, {}) assert_equal(webresponse.body, body) end end class TestSigningEncode < Test::Unit::TestCase def setup @_dumb_key = Server::Signatory._dumb_key @_normal_key = Server::Signatory._normal_key @store = Store::Memory.new() @server = Server::Server.new(@store, "http://signing.unittest/enc") @request = Server::CheckIDRequest.new( 'http://bombom.unittest/', 'http://burr.unittest/999', @server.op_endpoint, 'http://burr.unittest/', false, nil) @response = Server::OpenIDResponse.new(@request) @response.fields = Message.from_openid_args({ 'mode' => 'id_res', 'identity' => @request.identity, 'return_to' => @request.return_to, }) @signatory = Server::Signatory.new(@store) @encoder = Server::SigningEncoder.new(@signatory) @encode = @encoder.method('encode') end def test_idres assoc_handle = '{bicycle}{shed}' @store.store_association( @_normal_key, Association.from_expires_in(60, assoc_handle, 'sekrit', 'HMAC-SHA1')) @request.assoc_handle = assoc_handle webresponse = @encode.call(@response) assert_equal(webresponse.code, Server::HTTP_REDIRECT) assert(webresponse.headers.member?('location')) location = webresponse.headers['location'] query = Util.parse_query(URI::parse(location).query) assert(query.member?('openid.sig')) assert(query.member?('openid.assoc_handle')) assert(query.member?('openid.signed')) end def test_idresDumb webresponse = @encode.call(@response) assert_equal(webresponse.code, Server::HTTP_REDIRECT) assert(webresponse.headers.has_key?('location')) location = webresponse.headers['location'] query = Util.parse_query(URI::parse(location).query) assert(query.member?('openid.sig')) assert(query.member?('openid.assoc_handle')) assert(query.member?('openid.signed')) end def test_forgotStore @encoder.signatory = nil assert_raise(ArgumentError) { @encode.call(@response) } end def test_cancel request = Server::CheckIDRequest.new( 'http://bombom.unittest/', 'http://burr.unittest/999', @server.op_endpoint, 'http://burr.unittest/', false, nil) response = Server::OpenIDResponse.new(request) response.fields.set_arg(OPENID_NS, 'mode', 'cancel') webresponse = @encode.call(response) assert_equal(webresponse.code, Server::HTTP_REDIRECT) assert(webresponse.headers.has_key?('location')) location = webresponse.headers['location'] query = Util.parse_query(URI::parse(location).query) assert(!query.has_key?('openid.sig'), response.fields.to_post_args()) end def test_assocReply msg = Message.new(OPENID2_NS) msg.set_arg(OPENID2_NS, 'session_type', 'no-encryption') request = Server::AssociateRequest.from_message(msg) response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({'assoc_handle' => "every-zig"}) webresponse = @encode.call(response) body = "assoc_handle:every-zig\n" assert_equal(webresponse.code, Server::HTTP_OK) assert_equal(webresponse.headers, {}) assert_equal(webresponse.body, body) end def test_alreadySigned @response.fields.set_arg(OPENID_NS, 'sig', 'priorSig==') assert_raise(Server::AlreadySigned) { @encode.call(@response) } end end class TestCheckID < Test::Unit::TestCase def setup @op_endpoint = 'http://endpoint.unittest/' @store = Store::Memory.new() @server = Server::Server.new(@store, @op_endpoint) @request = Server::CheckIDRequest.new( 'http://bambam.unittest/', 'http://bar.unittest/999', @server.op_endpoint, 'http://bar.unittest/', false) end def test_trustRootInvalid @request.trust_root = "http://foo.unittest/17" @request.return_to = "http://foo.unittest/39" assert(!@request.trust_root_valid()) end def test_trustRootInvalid_modified @request.trust_root = "does://not.parse/" assert_raise(Server::MalformedTrustRoot) { @request.trust_root_valid() } end def test_trustRootvalid_absent_trust_root @request.trust_root = nil assert(@request.trust_root_valid()) end def test_trustRootValid @request.trust_root = "http://foo.unittest/" @request.return_to = "http://foo.unittest/39" assert(@request.trust_root_valid()) end def test_trustRootValidNoReturnTo request = Server::CheckIDRequest.new( 'http://bambam.unittest/', nil, @server.op_endpoint, 'http://bar.unittest/', false) assert(request.trust_root_valid()) end def test_returnToVerified_callsVerify # Make sure that verifyReturnTo is calling the trustroot # function verifyReturnTo # Ensure that exceptions are passed through sentinel = Exception.new() __req = @request tc = self vrfyExc = Proc.new { |trust_root, return_to| tc.assert_equal(__req.trust_root, trust_root) tc.assert_equal(__req.return_to, return_to) raise sentinel } TrustRoot.extend(OverrideMethodMixin) TrustRoot.with_method_overridden(:verify_return_to, vrfyExc) do begin @request.return_to_verified() flunk("Expected sentinel to be raised, got success") rescue Exception => e assert(e.equal?(sentinel), [e, sentinel].inspect) end end # Ensure that True and False are passed through unchanged constVerify = Proc.new { |val| verify = Proc.new { |trust_root, return_to| tc.assert_equal(__req.trust_root, trust_root) tc.assert_equal(__req.request.return_to, return_to) return val } return verify } [true, false].each { |val| verifier = constVerify.call(val) TrustRoot.with_method_overridden(:verify_return_to, verifier) do assert_equal(val, @request.return_to_verified()) end } end def _expectAnswer(answer, identity=nil, claimed_id=nil) expected_list = [ ['mode', 'id_res'], ['return_to', @request.return_to], ['op_endpoint', @op_endpoint], ] if identity expected_list << ['identity', identity] if claimed_id expected_list << ['claimed_id', claimed_id] else expected_list << ['claimed_id', identity] end end expected_list.each { |k, expected| actual = answer.fields.get_arg(OPENID_NS, k) assert_equal(expected, actual, sprintf("%s: expected %s, got %s", k, expected, actual)) } assert(answer.fields.has_key?(OPENID_NS, 'response_nonce')) assert(answer.fields.get_openid_namespace() == OPENID2_NS) # One for nonce, one for ns assert_equal(answer.fields.to_post_args.length, expected_list.length + 2, answer.fields.to_post_args.inspect) end def test_answerAllow # Check the fields specified by "Positive Assertions" # # including mode=id_res, identity, claimed_id, op_endpoint, # return_to answer = @request.answer(true) assert_equal(answer.request, @request) _expectAnswer(answer, @request.identity) end def test_answerAllowDelegatedIdentity @request.claimed_id = 'http://delegating.unittest/' answer = @request.answer(true) _expectAnswer(answer, @request.identity, @request.claimed_id) end def test_answerAllowWithoutIdentityReally @request.identity = nil answer = @request.answer(true) assert_equal(answer.request, @request) _expectAnswer(answer) end def test_answerAllowAnonymousFail @request.identity = nil # XXX - Check on this, I think this behavior is legal in OpenID # 2.0? assert_raise(ArgumentError) { @request.answer(true, nil, "=V") } end def test_answerAllowWithIdentity @request.identity = IDENTIFIER_SELECT selected_id = 'http://anon.unittest/9861' answer = @request.answer(true, nil, selected_id) _expectAnswer(answer, selected_id) end def test_answerAllowWithNoIdentity @request.identity = IDENTIFIER_SELECT selected_id = 'http://anon.unittest/9861' assert_raise(ArgumentError) { answer = @request.answer(true, nil, nil) } end def test_immediate_openid1_no_identity @request.namespace = OPENID1_NS @request.immediate = true @request.mode = 'checkid_immediate' resp = @request.answer(false) assert(resp.fields.get_arg(OPENID_NS, 'mode') == 'id_res') end def test_checkid_setup_openid1_no_identity @request.namespace = OPENID1_NS @request.immediate = false @request.mode = 'checkid_setup' resp = @request.answer(false) assert(resp.fields.get_arg(OPENID_NS, 'mode') == 'cancel') end def test_immediate_openid1_no_server_url @request.namespace = OPENID1_NS @request.immediate = true @request.mode = 'checkid_immediate' @request.op_endpoint = nil assert_raise(ArgumentError) { resp = @request.answer(false) } end def test_immediate_encode_to_url @request.namespace = OPENID1_NS @request.immediate = true @request.mode = 'checkid_immediate' @request.trust_root = "BOGUS" @request.assoc_handle = "ASSOC" server_url = "http://server.com/server" url = @request.encode_to_url(server_url) assert(url.starts_with?(server_url)) unused, query = url.split("?", 2) args = Util.parse_query(query) m = Message.from_post_args(args) assert(m.get_arg(OPENID_NS, 'trust_root') == "BOGUS") assert(m.get_arg(OPENID_NS, 'assoc_handle') == "ASSOC") assert(m.get_arg(OPENID_NS, 'mode'), "checkid_immediate") assert(m.get_arg(OPENID_NS, 'identity') == @request.identity) assert(m.get_arg(OPENID_NS, 'claimed_id') == @request.claimed_id) assert(m.get_arg(OPENID_NS, 'return_to') == @request.return_to) end def test_answerAllowWithDelegatedIdentityOpenID2 # Answer an IDENTIFIER_SELECT case with a delegated identifier. # claimed_id delegates to selected_id here. @request.identity = IDENTIFIER_SELECT selected_id = 'http://anon.unittest/9861' claimed_id = 'http://monkeyhat.unittest/' answer = @request.answer(true, nil, selected_id, claimed_id) _expectAnswer(answer, selected_id, claimed_id) end def test_answerAllowWithDelegatedIdentityOpenID1 # claimed_id parameter doesn't exist in OpenID 1. @request.namespace = OPENID1_NS # claimed_id delegates to selected_id here. @request.identity = IDENTIFIER_SELECT selected_id = 'http://anon.unittest/9861' claimed_id = 'http://monkeyhat.unittest/' assert_raise(Server::VersionError) { @request.answer(true, nil, selected_id, claimed_id) } end def test_answerAllowWithAnotherIdentity # XXX - Check on this, I think this behavior is legal in OpenID # 2.0? assert_raise(ArgumentError){ @request.answer(true, nil, "http://pebbles.unittest/") } end def test_answerAllowNoIdentityOpenID1 @request.namespace = OPENID1_NS @request.identity = nil assert_raise(ArgumentError) { @request.answer(true, nil, nil) } end def test_answerAllowForgotEndpoint @request.op_endpoint = nil assert_raise(RuntimeError) { @request.answer(true) } end def test_checkIDWithNoIdentityOpenID1 msg = Message.new(OPENID1_NS) msg.set_arg(OPENID_NS, 'return_to', 'bogus') msg.set_arg(OPENID_NS, 'trust_root', 'bogus') msg.set_arg(OPENID_NS, 'mode', 'checkid_setup') msg.set_arg(OPENID_NS, 'assoc_handle', 'bogus') assert_raise(Server::ProtocolError) { Server::CheckIDRequest.from_message(msg, @server) } end def test_trustRootOpenID1 # Ignore openid.realm in OpenID 1 msg = Message.new(OPENID1_NS) msg.set_arg(OPENID_NS, 'mode', 'checkid_setup') msg.set_arg(OPENID_NS, 'trust_root', 'http://trustroot.com/') msg.set_arg(OPENID_NS, 'realm', 'http://fake_trust_root/') msg.set_arg(OPENID_NS, 'return_to', 'http://trustroot.com/foo') msg.set_arg(OPENID_NS, 'assoc_handle', 'bogus') msg.set_arg(OPENID_NS, 'identity', 'george') result = Server::CheckIDRequest.from_message(msg, @server.op_endpoint) assert(result.trust_root == 'http://trustroot.com/') end def test_trustRootOpenID2 # Ignore openid.trust_root in OpenID 2 msg = Message.new(OPENID2_NS) msg.set_arg(OPENID_NS, 'mode', 'checkid_setup') msg.set_arg(OPENID_NS, 'realm', 'http://trustroot.com/') msg.set_arg(OPENID_NS, 'trust_root', 'http://fake_trust_root/') msg.set_arg(OPENID_NS, 'return_to', 'http://trustroot.com/foo') msg.set_arg(OPENID_NS, 'assoc_handle', 'bogus') msg.set_arg(OPENID_NS, 'identity', 'george') msg.set_arg(OPENID_NS, 'claimed_id', 'george') result = Server::CheckIDRequest.from_message(msg, @server.op_endpoint) assert(result.trust_root == 'http://trustroot.com/') end def test_answerAllowNoTrustRoot @request.trust_root = nil answer = @request.answer(true) assert_equal(answer.request, @request) _expectAnswer(answer, @request.identity) end def test_answerImmediateDenyOpenID2 # Look for mode=setup_needed in checkid_immediate negative # response in OpenID 2 case. # # See specification Responding to Authentication Requests / # Negative Assertions / In Response to Immediate Requests. @request.mode = 'checkid_immediate' @request.immediate = true server_url = "http://setup-url.unittest/" # crappiting setup_url, you dirty my interface with your presence! answer = @request.answer(false, server_url) assert_equal(answer.request, @request) assert_equal(answer.fields.to_post_args.length, 3, answer.fields) assert_equal(answer.fields.get_openid_namespace, OPENID2_NS) assert_equal(answer.fields.get_arg(OPENID_NS, 'mode'), 'setup_needed') # user_setup_url no longer required. end def test_answerImmediateDenyOpenID1 # Look for user_setup_url in checkid_immediate negative response # in OpenID 1 case. @request.namespace = OPENID1_NS @request.mode = 'checkid_immediate' @request.immediate = true server_url = "http://setup-url.unittest/" # crappiting setup_url, you dirty my interface with your presence! answer = @request.answer(false, server_url) assert_equal(answer.request, @request) assert_equal(answer.fields.to_post_args.length, 2, answer.fields) assert_equal(answer.fields.get_openid_namespace, OPENID1_NS) assert_equal(answer.fields.get_arg(OPENID_NS, 'mode'), 'id_res') assert(answer.fields.get_arg( OPENID_NS, 'user_setup_url', '').starts_with?(server_url)) end def test_answerSetupDeny answer = @request.answer(false) assert_equal(answer.fields.get_args(OPENID_NS), { 'mode' => 'cancel', }) end def test_encodeToURL server_url = 'http://openid-server.unittest/' result = @request.encode_to_url(server_url) # How to check? How about a round-trip test. base, result_args = result.split('?', 2) result_args = Util.parse_query(result_args) message = Message.from_post_args(result_args) rebuilt_request = Server::CheckIDRequest.from_message(message, @server.op_endpoint) @request.message = message @request.instance_variables.each { |var| assert_equal(@request.instance_variable_get(var), rebuilt_request.instance_variable_get(var), var) } end def test_getCancelURL url = @request.cancel_url rt, query_string = url.split('?', -1) assert_equal(@request.return_to, rt) query = Util.parse_query(query_string) assert_equal(query, {'openid.mode' => 'cancel', 'openid.ns' => OPENID2_NS}) end def test_getCancelURLimmed @request.mode = 'checkid_immediate' @request.immediate = true assert_raise(ArgumentError) { @request.cancel_url } end end class TestCheckIDExtension < Test::Unit::TestCase def setup @op_endpoint = 'http://endpoint.unittest/ext' @store = Store::Memory.new() @server = Server::Server.new(@store, @op_endpoint) @request = Server::CheckIDRequest.new( 'http://bambam.unittest/', 'http://bar.unittest/999', @server.op_endpoint, 'http://bar.unittest/', false) @response = Server::OpenIDResponse.new(@request) @response.fields.set_arg(OPENID_NS, 'mode', 'id_res') @response.fields.set_arg(OPENID_NS, 'blue', 'star') end def test_addField namespace = 'something:' @response.fields.set_arg(namespace, 'bright', 'potato') assert_equal(@response.fields.get_args(OPENID_NS), {'blue' => 'star', 'mode' => 'id_res', }) assert_equal(@response.fields.get_args(namespace), {'bright' => 'potato'}) end def test_addFields namespace = 'mi5:' args = {'tangy' => 'suspenders', 'bravo' => 'inclusion'} @response.fields.update_args(namespace, args) assert_equal(@response.fields.get_args(OPENID_NS), {'blue' => 'star', 'mode' => 'id_res', }) assert_equal(@response.fields.get_args(namespace), args) end end class MockSignatory attr_accessor :isValid, :assocs def initialize(assoc) @isValid = true @assocs = [assoc] end def verify(assoc_handle, message) Util.assert(message.has_key?(OPENID_NS, "sig")) if self.assocs.member?([true, assoc_handle]) return @isValid else return false end end def get_association(assoc_handle, dumb) if self.assocs.member?([dumb, assoc_handle]) # This isn't a valid implementation for many uses of this # function, mind you. return true else return nil end end def invalidate(assoc_handle, dumb) if self.assocs.member?([dumb, assoc_handle]) @assocs.delete([dumb, assoc_handle]) end end end class TestCheckAuth < Test::Unit::TestCase def setup @assoc_handle = 'mooooooooo' @message = Message.from_post_args({ 'openid.sig' => 'signarture', 'one' => 'alpha', 'two' => 'beta', }) @request = Server::CheckAuthRequest.new( @assoc_handle, @message) @signatory = MockSignatory.new([true, @assoc_handle]) end def test_to_s @request.to_s end def test_valid r = @request.answer(@signatory) assert_equal({'is_valid' => 'true'}, r.fields.get_args(OPENID_NS)) assert_equal(r.request, @request) end def test_invalid @signatory.isValid = false r = @request.answer(@signatory) assert_equal({'is_valid' => 'false'}, r.fields.get_args(OPENID_NS)) end def test_replay # Don't validate the same response twice. # # From "Checking the Nonce":: # # When using "check_authentication", the OP MUST ensure that an # assertion has not yet been accepted with the same value for # "openid.response_nonce". # # In this implementation, the assoc_handle is only valid once. # And nonces are a signed component of the message, so they can't # be used with another handle without breaking the sig. r = @request.answer(@signatory) r = @request.answer(@signatory) assert_equal({'is_valid' => 'false'}, r.fields.get_args(OPENID_NS)) end def test_invalidatehandle @request.invalidate_handle = "bogusHandle" r = @request.answer(@signatory) assert_equal(r.fields.get_args(OPENID_NS), {'is_valid' => 'true', 'invalidate_handle' => "bogusHandle"}) assert_equal(r.request, @request) end def test_invalidatehandleNo assoc_handle = 'goodhandle' @signatory.assocs << [false, 'goodhandle'] @request.invalidate_handle = assoc_handle r = @request.answer(@signatory) assert_equal(r.fields.get_args(OPENID_NS), {'is_valid' => 'true'}) end end class TestAssociate < Test::Unit::TestCase # TODO: test DH with non-default values for modulus and gen. # (important to do because we actually had it broken for a while.) def setup @request = Server::AssociateRequest.from_message(Message.from_post_args({})) @store = Store::Memory.new() @signatory = Server::Signatory.new(@store) end def test_dhSHA1 @assoc = @signatory.create_association(false, 'HMAC-SHA1') consumer_dh = DiffieHellman.from_defaults() cpub = consumer_dh.public server_dh = DiffieHellman.from_defaults() session = Server::DiffieHellmanSHA1ServerSession.new(server_dh, cpub) @request = Server::AssociateRequest.new(session, 'HMAC-SHA1') response = @request.answer(@assoc) rfg = lambda { |f| response.fields.get_arg(OPENID_NS, f) } assert_equal(rfg.call("assoc_type"), "HMAC-SHA1") assert_equal(rfg.call("assoc_handle"), @assoc.handle) assert(!rfg.call("mac_key")) assert_equal(rfg.call("session_type"), "DH-SHA1") assert(rfg.call("enc_mac_key")) assert(rfg.call("dh_server_public")) enc_key = Util.from_base64(rfg.call("enc_mac_key")) spub = CryptUtil.base64_to_num(rfg.call("dh_server_public")) secret = consumer_dh.xor_secret(CryptUtil.method('sha1'), spub, enc_key) assert_equal(secret, @assoc.secret) end def test_dhSHA256 @assoc = @signatory.create_association(false, 'HMAC-SHA256') consumer_dh = DiffieHellman.from_defaults() cpub = consumer_dh.public server_dh = DiffieHellman.from_defaults() session = Server::DiffieHellmanSHA256ServerSession.new(server_dh, cpub) @request = Server::AssociateRequest.new(session, 'HMAC-SHA256') response = @request.answer(@assoc) rfg = lambda { |f| response.fields.get_arg(OPENID_NS, f) } assert_equal(rfg.call("assoc_type"), "HMAC-SHA256") assert_equal(rfg.call("assoc_handle"), @assoc.handle) assert(!rfg.call("mac_key")) assert_equal(rfg.call("session_type"), "DH-SHA256") assert(rfg.call("enc_mac_key")) assert(rfg.call("dh_server_public")) enc_key = Util.from_base64(rfg.call("enc_mac_key")) spub = CryptUtil.base64_to_num(rfg.call("dh_server_public")) secret = consumer_dh.xor_secret(CryptUtil.method('sha256'), spub, enc_key) assert_equal(secret, @assoc.secret) end def test_protoError256 s256_session = Consumer::DiffieHellmanSHA256Session.new() invalid_s256 = {'openid.assoc_type' => 'HMAC-SHA1', 'openid.session_type' => 'DH-SHA256',} invalid_s256.merge!(s256_session.get_request()) invalid_s256_2 = {'openid.assoc_type' => 'MONKEY-PIRATE', 'openid.session_type' => 'DH-SHA256',} invalid_s256_2.merge!(s256_session.get_request()) bad_request_argss = [ invalid_s256, invalid_s256_2, ] bad_request_argss.each { |request_args| message = Message.from_post_args(request_args) assert_raise(Server::ProtocolError) { Server::AssociateRequest.from_message(message) } } end def test_protoError s1_session = Consumer::DiffieHellmanSHA1Session.new() invalid_s1 = {'openid.assoc_type' => 'HMAC-SHA256', 'openid.session_type' => 'DH-SHA1',} invalid_s1.merge!(s1_session.get_request()) invalid_s1_2 = {'openid.assoc_type' => 'ROBOT-NINJA', 'openid.session_type' => 'DH-SHA1',} invalid_s1_2.merge!(s1_session.get_request()) bad_request_argss = [ {'openid.assoc_type' => 'Wha?'}, invalid_s1, invalid_s1_2, ] bad_request_argss.each { |request_args| message = Message.from_post_args(request_args) assert_raise(Server::ProtocolError) { Server::AssociateRequest.from_message(message) } } end def test_protoErrorFields contact = 'user@example.invalid' reference = 'Trac ticket number MAX_INT' error = 'poltergeist' openid1_args = { 'openid.identitiy' => 'invalid', 'openid.mode' => 'checkid_setup', } openid2_args = openid1_args.dup openid2_args.merge!({'openid.ns' => OPENID2_NS}) # Check presence of optional fields in both protocol versions openid1_msg = Message.from_post_args(openid1_args) p = Server::ProtocolError.new(openid1_msg, error, reference, contact) reply = p.to_message() assert_equal(reply.get_arg(OPENID_NS, 'reference'), reference) assert_equal(reply.get_arg(OPENID_NS, 'contact'), contact) openid2_msg = Message.from_post_args(openid2_args) p = Server::ProtocolError.new(openid2_msg, error, reference, contact) reply = p.to_message() assert_equal(reply.get_arg(OPENID_NS, 'reference'), reference) assert_equal(reply.get_arg(OPENID_NS, 'contact'), contact) end def failUnlessExpiresInMatches(msg, expected_expires_in) expires_in_str = msg.get_arg(OPENID_NS, 'expires_in', NO_DEFAULT) expires_in = expires_in_str.to_i # Slop is necessary because the tests can sometimes get run # right on a second boundary slop = 1 # second difference = expected_expires_in - expires_in error_message = sprintf('"expires_in" value not within %s of expected: ' + 'expected=%s, actual=%s', slop, expected_expires_in, expires_in) assert((0 <= difference and difference <= slop), error_message) end def test_plaintext @assoc = @signatory.create_association(false, 'HMAC-SHA1') response = @request.answer(@assoc) rfg = lambda { |f| response.fields.get_arg(OPENID_NS, f) } assert_equal(rfg.call("assoc_type"), "HMAC-SHA1") assert_equal(rfg.call("assoc_handle"), @assoc.handle) failUnlessExpiresInMatches(response.fields, @signatory.secret_lifetime) assert_equal( rfg.call("mac_key"), Util.to_base64(@assoc.secret)) assert(!rfg.call("session_type")) assert(!rfg.call("enc_mac_key")) assert(!rfg.call("dh_server_public")) end def test_plaintext256 @assoc = @signatory.create_association(false, 'HMAC-SHA256') response = @request.answer(@assoc) rfg = lambda { |f| response.fields.get_arg(OPENID_NS, f) } assert_equal(rfg.call("assoc_type"), "HMAC-SHA1") assert_equal(rfg.call("assoc_handle"), @assoc.handle) failUnlessExpiresInMatches( response.fields, @signatory.secret_lifetime) assert_equal( rfg.call("mac_key"), Util.to_base64(@assoc.secret)) assert(!rfg.call("session_type")) assert(!rfg.call("enc_mac_key")) assert(!rfg.call("dh_server_public")) end def test_unsupportedPrefer allowed_assoc = 'COLD-PET-RAT' allowed_sess = 'FROG-BONES' message = 'This is a unit test' # Set an OpenID 2 message so answerUnsupported doesn't raise # ProtocolError. @request.message = Message.new(OPENID2_NS) response = @request.answer_unsupported(message, allowed_assoc, allowed_sess) rfg = lambda { |f| response.fields.get_arg(OPENID_NS, f) } assert_equal(rfg.call('error_code'), 'unsupported-type') assert_equal(rfg.call('assoc_type'), allowed_assoc) assert_equal(rfg.call('error'), message) assert_equal(rfg.call('session_type'), allowed_sess) end def test_unsupported message = 'This is a unit test' # Set an OpenID 2 message so answerUnsupported doesn't raise # ProtocolError. @request.message = Message.new(OPENID2_NS) response = @request.answer_unsupported(message) rfg = lambda { |f| response.fields.get_arg(OPENID_NS, f) } assert_equal(rfg.call('error_code'), 'unsupported-type') assert_equal(rfg.call('assoc_type'), nil) assert_equal(rfg.call('error'), message) assert_equal(rfg.call('session_type'), nil) end def test_openid1_unsupported_explode # answer_unsupported on an associate request should explode if # the request was an OpenID 1 request. m = Message.new(OPENID1_NS) assert_raise(Server::ProtocolError) { @request.answer_unsupported(m) } end end class Counter def initialize @count = 0 end def inc @count += 1 end end class UnhandledError < Exception end class TestServer < Test::Unit::TestCase include TestUtil def setup @store = Store::Memory.new() @server = Server::Server.new(@store, "http://server.unittest/endpt") # catchlogs_setup() end def test_failed_dispatch request = Server::OpenIDRequest.new() request.mode = "monkeymode" request.namespace = OPENID1_NS assert_raise(RuntimeError) { webresult = @server.handle_request(request) } end def test_decode_request @server.decoder = BogusDecoder.new(@server) assert(@server.decode_request({}) == "BOGUS") end def test_encode_response @server.encoder = BogusEncoder.new assert(@server.encode_response(nil) == "BOGUS") end def test_dispatch monkeycalled = Counter.new() @server.extend(InstanceDefExtension) @server.instance_def(:openid_monkeymode) do raise UnhandledError end request = Server::OpenIDRequest.new() request.mode = "monkeymode" request.namespace = OPENID1_NS assert_raise(UnhandledError) { webresult = @server.handle_request(request) } end def test_associate request = Server::AssociateRequest.from_message(Message.from_post_args({})) response = @server.openid_associate(request) assert(response.fields.has_key?(OPENID_NS, "assoc_handle"), sprintf("No assoc_handle here: %s", response.fields.inspect)) end def test_associate2 # Associate when the server has no allowed association types # # Gives back an error with error_code and no fallback session or # assoc types. @server.negotiator.allowed_types = [] # Set an OpenID 2 message so answerUnsupported doesn't raise # ProtocolError. msg = Message.from_post_args({ 'openid.ns' => OPENID2_NS, 'openid.session_type' => 'no-encryption', }) request = Server::AssociateRequest.from_message(msg) response = @server.openid_associate(request) assert(response.fields.has_key?(OPENID_NS, "error")) assert(response.fields.has_key?(OPENID_NS, "error_code")) assert(!response.fields.has_key?(OPENID_NS, "assoc_handle")) assert(!response.fields.has_key?(OPENID_NS, "assoc_type")) assert(!response.fields.has_key?(OPENID_NS, "session_type")) end def test_associate3 # Request an assoc type that is not supported when there are # supported types. # # Should give back an error message with a fallback type. @server.negotiator.allowed_types = [['HMAC-SHA256', 'DH-SHA256']] msg = Message.from_post_args({ 'openid.ns' => OPENID2_NS, 'openid.session_type' => 'no-encryption', }) request = Server::AssociateRequest.from_message(msg) response = @server.openid_associate(request) assert(response.fields.has_key?(OPENID_NS, "error")) assert(response.fields.has_key?(OPENID_NS, "error_code")) assert(!response.fields.has_key?(OPENID_NS, "assoc_handle")) assert_equal(response.fields.get_arg(OPENID_NS, "assoc_type"), 'HMAC-SHA256') assert_equal(response.fields.get_arg(OPENID_NS, "session_type"), 'DH-SHA256') end def test_associate4 # DH-SHA256 association session @server.negotiator.allowed_types = [['HMAC-SHA256', 'DH-SHA256']] query = { 'openid.dh_consumer_public' => 'ALZgnx8N5Lgd7pCj8K86T/DDMFjJXSss1SKoLmxE72kJTzOtG6I2PaYrHX' + 'xku4jMQWSsGfLJxwCZ6280uYjUST/9NWmuAfcrBfmDHIBc3H8xh6RBnlXJ' + '1WxJY3jHd5k1/ZReyRZOxZTKdF/dnIqwF8ZXUwI6peV0TyS/K1fOfF/s', 'openid.assoc_type' => 'HMAC-SHA256', 'openid.session_type' => 'DH-SHA256', } message = Message.from_post_args(query) request = Server::AssociateRequest.from_message(message) response = @server.openid_associate(request) assert(response.fields.has_key?(OPENID_NS, "assoc_handle")) end def test_no_encryption_openid1 # Make sure no-encryption associate requests for OpenID 1 are # logged. assert_log_matches(/Continuing anyway./) { m = Message.from_openid_args({ 'session_type' => 'no-encryption', }) req = Server::AssociateRequest.from_message(m) } end def test_missingSessionTypeOpenID2 # Make sure session_type is required in OpenID 2 msg = Message.from_post_args({ 'openid.ns' => OPENID2_NS, }) assert_raises(Server::ProtocolError) { Server::AssociateRequest.from_message(msg) } end def test_checkAuth request = Server::CheckAuthRequest.new('arrrrrf', '0x3999', []) response = nil silence_logging { response = @server.openid_check_authentication(request) } assert(response.fields.has_key?(OPENID_NS, "is_valid")) end end class TestingRequest < Server::OpenIDRequest attr_accessor :assoc_handle, :namespace end class TestSignatory < Test::Unit::TestCase include TestUtil def setup @store = Store::Memory.new() @signatory = Server::Signatory.new(@store) @_dumb_key = @signatory.class._dumb_key @_normal_key = @signatory.class._normal_key # CatchLogs.setUp(self) end def test_get_association_nil assert_raises(ArgumentError) { @signatory.get_association(nil, false) } end def test_sign request = TestingRequest.new() assoc_handle = '{assoc}{lookatme}' @store.store_association( @_normal_key, Association.from_expires_in(60, assoc_handle, 'sekrit', 'HMAC-SHA1')) request.assoc_handle = assoc_handle request.namespace = OPENID1_NS response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'foo' => 'amsigned', 'bar' => 'notsigned', 'azu' => 'alsosigned', }) sresponse = @signatory.sign(response) assert_equal( sresponse.fields.get_arg(OPENID_NS, 'assoc_handle'), assoc_handle) assert_equal(sresponse.fields.get_arg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,signed') assert(sresponse.fields.get_arg(OPENID_NS, 'sig')) # assert(!@messages, @messages) end def test_signDumb request = TestingRequest.new() request.assoc_handle = nil request.namespace = OPENID2_NS response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'foo' => 'amsigned', 'bar' => 'notsigned', 'azu' => 'alsosigned', 'ns' => OPENID2_NS, }) sresponse = @signatory.sign(response) assoc_handle = sresponse.fields.get_arg(OPENID_NS, 'assoc_handle') assert(assoc_handle) assoc = @signatory.get_association(assoc_handle, true) assert(assoc) assert_equal(sresponse.fields.get_arg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,ns,signed') assert(sresponse.fields.get_arg(OPENID_NS, 'sig')) # assert(!@messages, @messages) end def test_signExpired # Sign a response to a message with an expired handle (using # invalidate_handle). # # From "Verifying with an Association": # # If an authentication request included an association handle # for an association between the OP and the Relying party, and # the OP no longer wishes to use that handle (because it has # expired or the secret has been compromised, for instance), # the OP will send a response that must be verified directly # with the OP, as specified in Section 11.3.2. In that # instance, the OP will include the field # "openid.invalidate_handle" set to the association handle # that the Relying Party included with the original request. request = TestingRequest.new() request.namespace = OPENID2_NS assoc_handle = '{assoc}{lookatme}' @store.store_association( @_normal_key, Association.from_expires_in(-10, assoc_handle, 'sekrit', 'HMAC-SHA1')) assert(@store.get_association(@_normal_key, assoc_handle)) request.assoc_handle = assoc_handle response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'foo' => 'amsigned', 'bar' => 'notsigned', 'azu' => 'alsosigned', }) sresponse = nil silence_logging { sresponse = @signatory.sign(response) } new_assoc_handle = sresponse.fields.get_arg(OPENID_NS, 'assoc_handle') assert(new_assoc_handle) assert(new_assoc_handle != assoc_handle) assert_equal( sresponse.fields.get_arg(OPENID_NS, 'invalidate_handle'), assoc_handle) assert_equal(sresponse.fields.get_arg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,invalidate_handle,signed') assert(sresponse.fields.get_arg(OPENID_NS, 'sig')) # make sure the expired association is gone assert(!@store.get_association(@_normal_key, assoc_handle), "expired association is still retrievable.") # make sure the new key is a dumb mode association assert(@store.get_association(@_dumb_key, new_assoc_handle)) assert(!@store.get_association(@_normal_key, new_assoc_handle)) # assert(@messages) end def test_signInvalidHandle request = TestingRequest.new() request.namespace = OPENID2_NS assoc_handle = '{bogus-assoc}{notvalid}' request.assoc_handle = assoc_handle response = Server::OpenIDResponse.new(request) response.fields = Message.from_openid_args({ 'foo' => 'amsigned', 'bar' => 'notsigned', 'azu' => 'alsosigned', }) sresponse = @signatory.sign(response) new_assoc_handle = sresponse.fields.get_arg(OPENID_NS, 'assoc_handle') assert(new_assoc_handle) assert(new_assoc_handle != assoc_handle) assert_equal( sresponse.fields.get_arg(OPENID_NS, 'invalidate_handle'), assoc_handle) assert_equal( sresponse.fields.get_arg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,invalidate_handle,signed') assert(sresponse.fields.get_arg(OPENID_NS, 'sig')) # make sure the new key is a dumb mode association assert(@store.get_association(@_dumb_key, new_assoc_handle)) assert(!@store.get_association(@_normal_key, new_assoc_handle)) # @failIf(@messages, @messages) end def test_verify assoc_handle = '{vroom}{zoom}' assoc = Association.from_expires_in( 60, assoc_handle, 'sekrit', 'HMAC-SHA1') @store.store_association(@_dumb_key, assoc) signed = Message.from_post_args({ 'openid.foo' => 'bar', 'openid.apple' => 'orange', 'openid.assoc_handle' => assoc_handle, 'openid.signed' => 'apple,assoc_handle,foo,signed', 'openid.sig' => 'uXoT1qm62/BB09Xbj98TQ8mlBco=', }) verified = @signatory.verify(assoc_handle, signed) assert(verified) # assert(!@messages, @messages) end def test_verifyBadSig assoc_handle = '{vroom}{zoom}' assoc = Association.from_expires_in( 60, assoc_handle, 'sekrit', 'HMAC-SHA1') @store.store_association(@_dumb_key, assoc) signed = Message.from_post_args({ 'openid.foo' => 'bar', 'openid.apple' => 'orange', 'openid.assoc_handle' => assoc_handle, 'openid.signed' => 'apple,assoc_handle,foo,signed', 'openid.sig' => 'uXoT1qm62/BB09Xbj98TQ8mlBco=BOGUS' }) verified = @signatory.verify(assoc_handle, signed) # @failIf(@messages, @messages) assert(!verified) end def test_verifyBadHandle assoc_handle = '{vroom}{zoom}' signed = Message.from_post_args({ 'foo' => 'bar', 'apple' => 'orange', 'openid.sig' => "Ylu0KcIR7PvNegB/K41KpnRgJl0=", }) verified = nil silence_logging { verified = @signatory.verify(assoc_handle, signed) } assert(!verified) #assert(@messages) end def test_verifyAssocMismatch # Attempt to validate sign-all message with a signed-list assoc. assoc_handle = '{vroom}{zoom}' assoc = Association.from_expires_in( 60, assoc_handle, 'sekrit', 'HMAC-SHA1') @store.store_association(@_dumb_key, assoc) signed = Message.from_post_args({ 'foo' => 'bar', 'apple' => 'orange', 'openid.sig' => "d71xlHtqnq98DonoSgoK/nD+QRM=", }) verified = nil silence_logging { verified = @signatory.verify(assoc_handle, signed) } assert(!verified) #assert(@messages) end def test_getAssoc assoc_handle = makeAssoc(true) assoc = @signatory.get_association(assoc_handle, true) assert(assoc) assert_equal(assoc.handle, assoc_handle) # @failIf(@messages, @messages) end def test_getAssocExpired assoc_handle = makeAssoc(true, -10) assoc = nil silence_logging { assoc = @signatory.get_association(assoc_handle, true) } assert(!assoc, assoc) # assert(@messages) end def test_getAssocInvalid ah = 'no-such-handle' silence_logging { assert_equal( @signatory.get_association(ah, false), nil) } # assert(!@messages, @messages) end def test_getAssocDumbVsNormal # getAssociation(dumb=False) cannot get a dumb assoc assoc_handle = makeAssoc(true) silence_logging { assert_equal( @signatory.get_association(assoc_handle, false), nil) } # @failIf(@messages, @messages) end def test_getAssocNormalVsDumb # getAssociation(dumb=True) cannot get a shared assoc # # From "Verifying Directly with the OpenID Provider":: # # An OP MUST NOT verify signatures for associations that have shared # MAC keys. assoc_handle = makeAssoc(false) silence_logging { assert_equal( @signatory.get_association(assoc_handle, true), nil) } # @failIf(@messages, @messages) end def test_createAssociation assoc = @signatory.create_association(false) silence_logging { assert(@signatory.get_association(assoc.handle, false)) } # @failIf(@messages, @messages) end def makeAssoc(dumb, lifetime=60) assoc_handle = '{bling}' assoc = Association.from_expires_in(lifetime, assoc_handle, 'sekrit', 'HMAC-SHA1') silence_logging { @store.store_association(((dumb and @_dumb_key) or @_normal_key), assoc) } return assoc_handle end def test_invalidate assoc_handle = '-squash-' assoc = Association.from_expires_in(60, assoc_handle, 'sekrit', 'HMAC-SHA1') silence_logging { @store.store_association(@_dumb_key, assoc) assoc = @signatory.get_association(assoc_handle, true) assert(assoc) assoc = @signatory.get_association(assoc_handle, true) assert(assoc) @signatory.invalidate(assoc_handle, true) assoc = @signatory.get_association(assoc_handle, true) assert(!assoc) } # @failIf(@messages, @messages) end end class RunthroughTestCase < Test::Unit::TestCase def setup @store = Store::Memory.new @server = Server::Server.new(@store, "http://example.com/openid/server") end def test_openid1_assoc_checkid assoc_args = {'openid.mode' => 'associate', 'openid.assoc_type' => 'HMAC-SHA1'} areq = @server.decode_request(assoc_args) aresp = @server.handle_request(areq) amess = aresp.fields assert(amess.is_openid1) ahandle = amess.get_arg(OPENID_NS, 'assoc_handle') assert(ahandle) assoc = @store.get_association('http://localhost/|normal', ahandle) assert(assoc.is_a?(Association)) checkid_args = {'openid.mode' => 'checkid_setup', 'openid.return_to' => 'http://example.com/openid/consumer', 'openid.assoc_handle' => ahandle, 'openid.identity' => 'http://foo.com/'} cireq = @server.decode_request(checkid_args) ciresp = cireq.answer(true) signed_resp = @server.signatory.sign(ciresp) assert_equal(assoc.get_message_signature(signed_resp.fields), signed_resp.fields.get_arg(OPENID_NS, 'sig')) assert(assoc.check_message_signature(signed_resp.fields)) end end end