From 568e5c05e61428b9d2b4f1292f5fdbdfdf966d3b Mon Sep 17 00:00:00 2001 From: Mike Fiedler Date: Fri, 22 Aug 2025 10:30:38 -0400 Subject: [PATCH] feat: add rate limiting to 2fa attempts Limits specifically calls to the 2FA-related actions: - checking a recovery code - checking a TOTP value - checking a Webauthn value The rate limits were selected to be a balance of usability vs how long it would take a slow-roll actor to continue trying. Metrics are emitted for monitoring and alerting purposes. Refs: #8456 Signed-off-by: Mike Fiedler --- tests/unit/accounts/test_core.py | 10 + tests/unit/accounts/test_services.py | 340 +++++++++++++++++++++++++-- tests/unit/test_config.py | 2 + warehouse/accounts/__init__.py | 13 + warehouse/accounts/services.py | 75 ++++-- warehouse/config.py | 13 + 6 files changed, 417 insertions(+), 36 deletions(-) diff --git a/tests/unit/accounts/test_core.py b/tests/unit/accounts/test_core.py index 409f3e26b239..14cd5dde9c9f 100644 --- a/tests/unit/accounts/test_core.py +++ b/tests/unit/accounts/test_core.py @@ -138,6 +138,8 @@ def test_includeme(monkeypatch): "warehouse.account.user_login_ratelimit_string": "10 per 5 minutes", "warehouse.account.ip_login_ratelimit_string": "10 per 5 minutes", "warehouse.account.global_login_ratelimit_string": "1000 per 5 minutes", + "warehouse.account.2fa_user_ratelimit_string": "5 per 5 minutes, 20 per hour, 50 per day", # noqa: E501 + "warehouse.account.2fa_ip_ratelimit_string": "10 per 5 minutes, 50 per hour", # noqa: E501 "warehouse.account.email_add_ratelimit_string": "2 per day", "warehouse.account.verify_email_ratelimit_string": "3 per 6 hours", "warehouse.account.password_reset_ratelimit_string": "5 per day", @@ -184,6 +186,14 @@ def test_includeme(monkeypatch): pretend.call( RateLimit("1000 per 5 minutes"), IRateLimiter, name="global.login" ), + pretend.call( + RateLimit("5 per 5 minutes, 20 per hour, 50 per day"), + IRateLimiter, + name="2fa.user", + ), + pretend.call( + RateLimit("10 per 5 minutes, 50 per hour"), IRateLimiter, name="2fa.ip" + ), pretend.call(RateLimit("2 per day"), IRateLimiter, name="email.add"), pretend.call(RateLimit("5 per day"), IRateLimiter, name="password.reset"), pretend.call(RateLimit("3 per 6 hours"), IRateLimiter, name="email.verify"), diff --git a/tests/unit/accounts/test_services.py b/tests/unit/accounts/test_services.py index 74501b53d1a1..02be1849ef4f 100644 --- a/tests/unit/accounts/test_services.py +++ b/tests/unit/accounts/test_services.py @@ -639,10 +639,13 @@ def test_check_totp_value_no_secret(self, user_service): with pytest.raises(otp.InvalidTOTPError): user_service.check_totp_value(user.id, b"123456") - def test_check_totp_global_rate_limited(self, user_service, metrics): + def test_check_totp_ip_rate_limited(self, user_service, metrics): resets = pretend.stub() - limiter = pretend.stub(test=lambda: False, resets_in=lambda: resets) - user_service.ratelimiters["global.login"] = limiter + limiter = pretend.stub( + test=pretend.call_recorder(lambda uid: False), + resets_in=pretend.call_recorder(lambda uid: resets), + ) + user_service.ratelimiters["2fa.ip"] = limiter with pytest.raises(TooManyFailedLogins) as excinfo: user_service.check_totp_value(uuid.uuid4(), b"123456", tags=["foo"]) @@ -655,7 +658,7 @@ def test_check_totp_global_rate_limited(self, user_service, metrics): ), pretend.call( "warehouse.authentication.ratelimited", - tags=["foo", "mechanism:check_totp_value", "ratelimiter:global"], + tags=["foo", "mechanism:check_totp_value", "ratelimiter:ip"], ), ] @@ -666,7 +669,7 @@ def test_check_totp_value_user_rate_limited(self, user_service, metrics): test=pretend.call_recorder(lambda uid: False), resets_in=pretend.call_recorder(lambda uid: resets), ) - user_service.ratelimiters["user.login"] = limiter + user_service.ratelimiters["2fa.user"] = limiter with pytest.raises(TooManyFailedLogins) as excinfo: user_service.check_totp_value(user.id, b"123456") @@ -690,28 +693,314 @@ def test_check_totp_value_invalid_secret(self, user_service): limiter = pretend.stub( hit=pretend.call_recorder(lambda *a, **kw: None), test=lambda *a, **kw: True ) - user_service.ratelimiters["user.login"] = limiter - user_service.ratelimiters["global.login"] = limiter + user_service.ratelimiters["2fa.user"] = limiter + user_service.ratelimiters["2fa.ip"] = limiter valid = user_service.check_totp_value(user.id, b"123456") assert not valid - assert limiter.hit.calls == [pretend.call(user.id), pretend.call()] + assert limiter.hit.calls == [pretend.call(user.id), pretend.call(REMOTE_ADDR)] def test_check_totp_value_invalid_totp(self, user_service, monkeypatch): user = UserFactory.create() limiter = pretend.stub( - hit=pretend.call_recorder(lambda *a, **kw: None), test=lambda *a, **kw: True + hit=pretend.call_recorder(lambda ip: None), + test=pretend.call_recorder(lambda uid: True), ) user_service.get_totp_secret = lambda uid: "secret" monkeypatch.setattr(otp, "verify_totp", lambda secret, value: False) - user_service.ratelimiters["user.login"] = limiter - user_service.ratelimiters["global.login"] = limiter + user_service.ratelimiters["2fa.user"] = limiter + user_service.ratelimiters["2fa.ip"] = limiter valid = user_service.check_totp_value(user.id, b"123456") assert not valid - assert limiter.hit.calls == [pretend.call(user.id), pretend.call()] + assert limiter.test.calls == [pretend.call(REMOTE_ADDR), pretend.call(user.id)] + assert limiter.hit.calls == [pretend.call(user.id), pretend.call(REMOTE_ADDR)] + + def test_check_totp_value_with_2fa_rate_limiters( + self, db_session, metrics, monkeypatch + ): + """Test that check_totp_value uses new 2FA rate limiters when available.""" + user = UserFactory.create() + + # Create mocked rate limiters + ratelimiters = { + "user.login": pretend.stub(test=lambda *a: True, hit=lambda *a: None), + "ip.login": pretend.stub(test=lambda *a: True, hit=lambda *a: None), + "global.login": pretend.stub(test=lambda: True, hit=lambda: None), + "2fa.user": pretend.stub( + test=pretend.call_recorder(lambda uid: True), + hit=pretend.call_recorder(lambda uid: None), + ), + "2fa.ip": pretend.stub( + test=pretend.call_recorder(lambda addr: True), + hit=pretend.call_recorder(lambda addr: None), + ), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=REMOTE_ADDR, + ratelimiters=ratelimiters, + ) + + # Mock TOTP verification to fail + monkeypatch.setattr(otp, "verify_totp", lambda secret, value: False) + user_service.update_user(user.id, totp_secret=b"secret") + + result = user_service.check_totp_value(user.id, b"123456") + + assert not result + # Should use 2FA rate limiters, not login rate limiters + assert ratelimiters["2fa.user"].test.calls == [pretend.call(user.id)] + assert ratelimiters["2fa.ip"].test.calls == [pretend.call(REMOTE_ADDR)] + + def test_check_2fa_ratelimits_ip_limited(self, db_session, metrics): + """Test IP-based 2FA rate limiting.""" + user = UserFactory.create() + resets = pretend.stub() + + ratelimiters = { + "2fa.ip": pretend.stub( + test=pretend.call_recorder(lambda addr: False), + resets_in=pretend.call_recorder(lambda addr: resets), + ), + "2fa.user": pretend.stub(test=lambda *a: True), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=REMOTE_ADDR, + ratelimiters=ratelimiters, + ) + + with pytest.raises(TooManyFailedLogins) as excinfo: + user_service._check_2fa_ratelimits(userid=user.id, tags=["test_tag"]) + + assert excinfo.value.resets_in is resets + assert ratelimiters["2fa.ip"].test.calls == [pretend.call(REMOTE_ADDR)] + assert metrics.increment.calls == [ + pretend.call( + "warehouse.authentication.ratelimited", + tags=["test_tag", "ratelimiter:ip"], + ), + ] + + def test_check_2fa_ratelimits_user_limited(self, db_session, metrics): + """Test user-based 2FA rate limiting.""" + user = UserFactory.create() + resets = pretend.stub() + + ratelimiters = { + "2fa.ip": pretend.stub(test=lambda *a: True), + "2fa.user": pretend.stub( + test=pretend.call_recorder(lambda uid: False), + resets_in=pretend.call_recorder(lambda uid: resets), + ), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=REMOTE_ADDR, + ratelimiters=ratelimiters, + ) + + with pytest.raises(TooManyFailedLogins) as excinfo: + user_service._check_2fa_ratelimits(userid=user.id, tags=["test_tag"]) + + assert excinfo.value.resets_in is resets + assert ratelimiters["2fa.user"].test.calls == [pretend.call(user.id)] + assert metrics.increment.calls == [ + pretend.call( + "warehouse.authentication.ratelimited", + tags=["test_tag", "ratelimiter:user"], + ), + ] + + def test_check_2fa_ratelimits_no_remote_addr(self, db_session, metrics): + """Test 2FA rate limiting when remote_addr is None.""" + user = UserFactory.create() + + ratelimiters = { + "2fa.ip": pretend.stub( + test=pretend.call_recorder(lambda addr: True), + ), + "2fa.user": pretend.stub(test=lambda *a: True), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=None, + ratelimiters=ratelimiters, + ) + + # Should not raise, IP check should be skipped + user_service._check_2fa_ratelimits(userid=user.id) + + # IP limiter should not be called + assert ratelimiters["2fa.ip"].test.calls == [] + + def test_hit_2fa_ratelimits(self, db_session, metrics): + """Test hitting 2FA rate limits records properly.""" + user = UserFactory.create() + + ratelimiters = { + "2fa.user": pretend.stub(hit=pretend.call_recorder(lambda uid: None)), + "2fa.ip": pretend.stub(hit=pretend.call_recorder(lambda addr: None)), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=REMOTE_ADDR, + ratelimiters=ratelimiters, + ) + + user_service._hit_2fa_ratelimits(userid=user.id) + + assert ratelimiters["2fa.user"].hit.calls == [pretend.call(user.id)] + assert ratelimiters["2fa.ip"].hit.calls == [pretend.call(REMOTE_ADDR)] + + def test_hit_2fa_ratelimits_no_remote_addr(self, db_session, metrics): + """Test hitting 2FA rate limits when remote_addr is None.""" + user = UserFactory.create() + + ratelimiters = { + "2fa.user": pretend.stub(hit=pretend.call_recorder(lambda uid: None)), + "2fa.ip": pretend.stub(hit=pretend.call_recorder(lambda addr: None)), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=None, + ratelimiters=ratelimiters, + ) + + user_service._hit_2fa_ratelimits(userid=user.id) + + # Only user limiter should be hit + assert ratelimiters["2fa.user"].hit.calls == [pretend.call(user.id)] + assert ratelimiters["2fa.ip"].hit.calls == [] + + def test_verify_webauthn_assertion_rate_limited( + self, db_session, metrics, monkeypatch + ): + """Test that verify_webauthn_assertion uses 2FA rate limiters.""" + user = UserFactory.create() + resets = pretend.stub() + + ratelimiters = { + "2fa.user": pretend.stub( + test=pretend.call_recorder(lambda uid: False), + resets_in=pretend.call_recorder(lambda uid: resets), + ), + "2fa.ip": pretend.stub(test=lambda *a: True), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=REMOTE_ADDR, + ratelimiters=ratelimiters, + ) + + with pytest.raises(TooManyFailedLogins) as excinfo: + user_service.verify_webauthn_assertion( + user.id, + b"assertion", + challenge=b"challenge", + origin="https://example.com", + rp_id="example.com", + ) + + assert excinfo.value.resets_in is resets + assert ratelimiters["2fa.user"].test.calls == [pretend.call(user.id)] + assert metrics.increment.calls == [ + pretend.call( + "warehouse.authentication.ratelimited", + tags=["mechanism:webauthn", "ratelimiter:user"], + ), + ] + + def test_verify_webauthn_assertion_failure_hits_ratelimits( + self, db_session, metrics, monkeypatch + ): + """Test that failed WebAuthn assertions hit 2FA rate limiters.""" + user = UserFactory.create() + + ratelimiters = { + "2fa.user": pretend.stub( + test=lambda *a: True, hit=pretend.call_recorder(lambda uid: None) + ), + "2fa.ip": pretend.stub( + test=lambda *a: True, hit=pretend.call_recorder(lambda addr: None) + ), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=REMOTE_ADDR, + ratelimiters=ratelimiters, + ) + + # Mock webauthn to raise AuthenticationRejectedError + monkeypatch.setattr( + webauthn, + "verify_assertion_response", + pretend.raiser(webauthn.AuthenticationRejectedError("test error")), + ) + + with pytest.raises(webauthn.AuthenticationRejectedError): + user_service.verify_webauthn_assertion( + user.id, + b"assertion", + challenge=b"challenge", + origin="https://example.com", + rp_id="example.com", + ) + + assert ratelimiters["2fa.user"].hit.calls == [pretend.call(user.id)] + assert ratelimiters["2fa.ip"].hit.calls == [pretend.call(REMOTE_ADDR)] + + def test_check_recovery_code_uses_2fa_ratelimits(self, db_session, metrics): + """Test that check_recovery_code uses 2FA rate limiters.""" + user = UserFactory.create() + resets = pretend.stub() + + ratelimiters = { + "2fa.ip": pretend.stub( + test=pretend.call_recorder(lambda addr: False), + resets_in=pretend.call_recorder(lambda addr: resets), + ), + "2fa.user": pretend.stub(test=lambda *a: True), + } + + user_service = services.DatabaseUserService( + db_session, + metrics=metrics, + remote_addr=REMOTE_ADDR, + ratelimiters=ratelimiters, + ) + + with pytest.raises(TooManyFailedLogins) as excinfo: + user_service.check_recovery_code(user.id, "code") + + assert excinfo.value.resets_in is resets + assert ratelimiters["2fa.ip"].test.calls == [pretend.call(REMOTE_ADDR)] + assert metrics.increment.calls == [ + pretend.call("warehouse.authentication.recovery_code.start"), + pretend.call( + "warehouse.authentication.ratelimited", + tags=["mechanism:check_recovery_code", "ratelimiter:ip"], + ), + ] def test_get_webauthn_credential_options(self, user_service): user = UserFactory.create() @@ -978,10 +1267,13 @@ def test_check_recovery_code(self, user_service, metrics): ), ] - def test_check_recovery_code_global_rate_limited(self, user_service, metrics): + def test_check_recovery_code_ip_rate_limited(self, user_service, metrics): resets = pretend.stub() - limiter = pretend.stub(test=lambda: False, resets_in=lambda: resets) - user_service.ratelimiters["global.login"] = limiter + limiter = pretend.stub( + test=pretend.call_recorder(lambda uid: False), + resets_in=pretend.call_recorder(lambda uid: resets), + ) + user_service.ratelimiters["2fa.ip"] = limiter with pytest.raises(TooManyFailedLogins) as excinfo: user_service.check_recovery_code(uuid.uuid4(), "recovery_code") @@ -991,7 +1283,7 @@ def test_check_recovery_code_global_rate_limited(self, user_service, metrics): pretend.call("warehouse.authentication.recovery_code.start"), pretend.call( "warehouse.authentication.ratelimited", - tags=["mechanism:check_recovery_code", "ratelimiter:global"], + tags=["mechanism:check_recovery_code", "ratelimiter:ip"], ), ] @@ -1002,19 +1294,19 @@ def test_check_recovery_code_user_rate_limited(self, user_service, metrics): test=pretend.call_recorder(lambda uid: False), resets_in=pretend.call_recorder(lambda uid: resets), ) - user_service.ratelimiters["user.login"] = limiter + user_service.ratelimiters["2fa.ip"] = limiter with pytest.raises(TooManyFailedLogins) as excinfo: user_service.check_recovery_code(user.id, "recovery_code") assert excinfo.value.resets_in is resets - assert limiter.test.calls == [pretend.call(user.id)] - assert limiter.resets_in.calls == [pretend.call(user.id)] + assert limiter.test.calls == [pretend.call(REMOTE_ADDR)] + assert limiter.resets_in.calls == [pretend.call(REMOTE_ADDR)] assert metrics.increment.calls == [ pretend.call("warehouse.authentication.recovery_code.start"), pretend.call( "warehouse.authentication.ratelimited", - tags=["mechanism:check_recovery_code", "ratelimiter:user"], + tags=["mechanism:check_recovery_code", "ratelimiter:ip"], ), ] @@ -1221,6 +1513,8 @@ def test_database_login_factory(monkeypatch, pyramid_services, metrics): ip_login_ratelimiter = pretend.stub() email_add_ratelimiter = pretend.stub() password_reset_ratelimiter = pretend.stub() + user_2fa_ratelimiter = pretend.stub() + ip_2fa_ratelimiter = pretend.stub() def find_service(iface, name=None, context=None): if iface != IRateLimiter and name is None: @@ -1234,6 +1528,8 @@ def find_service(iface, name=None, context=None): "ip.login", "email.add", "password.reset", + "2fa.user", + "2fa.ip", } return ( @@ -1243,6 +1539,8 @@ def find_service(iface, name=None, context=None): "ip.login": ip_login_ratelimiter, "email.add": email_add_ratelimiter, "password.reset": password_reset_ratelimiter, + "2fa.user": user_2fa_ratelimiter, + "2fa.ip": ip_2fa_ratelimiter, } ).get(name) @@ -1263,6 +1561,8 @@ def find_service(iface, name=None, context=None): "ip.login": ip_login_ratelimiter, "email.add": email_add_ratelimiter, "password.reset": password_reset_ratelimiter, + "2fa.user": user_2fa_ratelimiter, + "2fa.ip": ip_2fa_ratelimiter, }, ) ] diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 7bdb0d50dddd..c6fb2bf194a6 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -317,6 +317,8 @@ def __init__(self): "warehouse.account.user_login_ratelimit_string": "10 per 5 minutes", "warehouse.account.ip_login_ratelimit_string": "10 per 5 minutes", "warehouse.account.global_login_ratelimit_string": "1000 per 5 minutes", + "warehouse.account.2fa_user_ratelimit_string": "5 per 5 minutes, 20 per hour, 50 per day", # noqa: E501 + "warehouse.account.2fa_ip_ratelimit_string": "10 per 5 minutes, 50 per hour", "warehouse.account.email_add_ratelimit_string": "2 per day", "warehouse.account.verify_email_ratelimit_string": "3 per 6 hours", "warehouse.account.accounts_search_ratelimit_string": "100 per hour", diff --git a/warehouse/accounts/__init__.py b/warehouse/accounts/__init__.py index e752c20adcfb..0765f3bb9058 100644 --- a/warehouse/accounts/__init__.py +++ b/warehouse/accounts/__init__.py @@ -178,6 +178,19 @@ def includeme(config): config.register_service_factory( RateLimit(global_login_ratelimit_string), IRateLimiter, name="global.login" ) + # Register separate rate limiters for 2FA attempts + twofa_user_ratelimit_string = config.registry.settings.get( + "warehouse.account.2fa_user_ratelimit_string" + ) + config.register_service_factory( + RateLimit(twofa_user_ratelimit_string), IRateLimiter, name="2fa.user" + ) + twofa_ip_ratelimit_string = config.registry.settings.get( + "warehouse.account.2fa_ip_ratelimit_string" + ) + config.register_service_factory( + RateLimit(twofa_ip_ratelimit_string), IRateLimiter, name="2fa.ip" + ) email_add_ratelimit_string = config.registry.settings.get( "warehouse.account.email_add_ratelimit_string" ) diff --git a/warehouse/accounts/services.py b/warehouse/accounts/services.py index c45ff94da5b8..b19cc84e4836 100644 --- a/warehouse/accounts/services.py +++ b/warehouse/accounts/services.py @@ -209,6 +209,37 @@ def _hit_ratelimits(self, userid=None): self.ratelimiters["global.login"].hit() self.ratelimiters["ip.login"].hit(self.remote_addr) + def _check_2fa_ratelimits(self, userid: int, tags: list[str] | None = None) -> None: + tags = tags if tags is not None else [] + + # Check IP-based 2FA rate limit + if self.remote_addr is not None: + if not self.ratelimiters["2fa.ip"].test(self.remote_addr): + logger.warning("IP failed 2FA threshold reached.") + self._metrics.increment( + "warehouse.authentication.ratelimited", + tags=tags + ["ratelimiter:ip"], + ) + raise TooManyFailedLogins( + resets_in=self.ratelimiters["2fa.ip"].resets_in(self.remote_addr) + ) + + # Check user-based 2FA rate limit + if not self.ratelimiters["2fa.user"].test(userid): + logger.warning("User failed 2FA threshold reached.") + self._metrics.increment( + "warehouse.authentication.ratelimited", + tags=tags + ["ratelimiter:user"], + ) + raise TooManyFailedLogins( + resets_in=self.ratelimiters["2fa.user"].resets_in(userid) + ) + + def _hit_2fa_ratelimits(self, userid: int) -> None: + self.ratelimiters["2fa.user"].hit(userid) + if self.remote_addr is not None: + self.ratelimiters["2fa.ip"].hit(self.remote_addr) + def check_password(self, userid, password, *, tags=None): tags = tags if tags is not None else [] tags.append("mechanism:check_password") @@ -395,7 +426,7 @@ def get_recovery_codes(self, user_id): # If we've gotten here, then we'll want to record a failed attempt in our # rate limiting before raising an exception to indicate a failed # recovery code verification. - self._hit_ratelimits(userid=user_id) + self._hit_2fa_ratelimits(userid=user_id) raise NoRecoveryCodes def get_recovery_code(self, user_id, code): @@ -415,7 +446,7 @@ def get_recovery_code(self, user_id, code): # If we've gotten here, then we'll want to record a failed attempt in our # rate limiting before returning False to indicate a failed recovery code # verification. - self._hit_ratelimits(userid=user_id) + self._hit_2fa_ratelimits(userid=user_id) raise InvalidRecoveryCode def get_totp_secret(self, user_id): @@ -450,7 +481,7 @@ def check_totp_value(self, user_id, totp_value, *, tags=None): tags.append("mechanism:check_totp_value") self._metrics.increment("warehouse.authentication.two_factor.start", tags=tags) - self._check_ratelimits(userid=user_id, tags=tags) + self._check_2fa_ratelimits(userid=user_id, tags=tags) totp_secret = self.get_totp_secret(user_id) @@ -462,7 +493,7 @@ def check_totp_value(self, user_id, totp_value, *, tags=None): # If we've gotten here, then we'll want to record a failed attempt in our # rate limiting before returning False to indicate a failed totp # verification. - self._hit_ratelimits(userid=user_id) + self._hit_2fa_ratelimits(userid=user_id) return False last_totp_value = self.get_last_totp_value(user_id) @@ -470,20 +501,20 @@ def check_totp_value(self, user_id, totp_value, *, tags=None): if last_totp_value is not None and totp_value == last_totp_value.encode(): return False - valid = otp.verify_totp(totp_secret, totp_value) - - if valid: - self._metrics.increment("warehouse.authentication.two_factor.ok", tags=tags) - else: + try: + if not (valid := otp.verify_totp(totp_secret, totp_value)): + self._hit_2fa_ratelimits(userid=user_id) + except otp.InvalidTOTPError: self._metrics.increment( "warehouse.authentication.two_factor.failure", tags=tags + ["failure_reason:invalid_totp"], ) # If we've gotten here, then we'll want to record a failed attempt in our - # rate limiting before returning False to indicate a failed totp - # verification. - self._hit_ratelimits(userid=user_id) + # rate limiting before raising to indicate a failed totp verification. + self._hit_2fa_ratelimits(userid=user_id) + raise otp.InvalidTOTPError + self._metrics.increment("warehouse.authentication.two_factor.ok", tags=tags) return valid def get_webauthn_credential_options(self, user_id, *, challenge, rp_name, rp_id): @@ -541,11 +572,19 @@ def verify_webauthn_assertion( Returns the updated signage count on success, raises webauthn.AuthenticationRejectedError on failure. """ + # Check rate limits before attempting verification + self._check_2fa_ratelimits(userid=user_id, tags=["mechanism:webauthn"]) + user = self.get_user(user_id) - return webauthn.verify_assertion_response( - assertion, challenge=challenge, user=user, origin=origin, rp_id=rp_id - ) + try: + return webauthn.verify_assertion_response( + assertion, challenge=challenge, user=user, origin=origin, rp_id=rp_id + ) + except webauthn.AuthenticationRejectedError: + # Hit rate limits on failure + self._hit_2fa_ratelimits(userid=user_id) + raise def add_webauthn(self, user_id, **kwargs): """ @@ -607,7 +646,7 @@ def check_recovery_code(self, user_id, code, skip_ratelimits=False): self._metrics.increment("warehouse.authentication.recovery_code.start") if not skip_ratelimits: - self._check_ratelimits( + self._check_2fa_ratelimits( userid=user_id, tags=["mechanism:check_recovery_code"], ) @@ -745,6 +784,10 @@ def database_login_factory(context, request): "user.login": request.find_service( IRateLimiter, name="user.login", context=None ), + "2fa.ip": request.find_service(IRateLimiter, name="2fa.ip", context=None), + "2fa.user": request.find_service( + IRateLimiter, name="2fa.user", context=None + ), "email.add": request.find_service( IRateLimiter, name="email.add", context=None ), diff --git a/warehouse/config.py b/warehouse/config.py index 84a2c97c9a2e..682c3446a41e 100644 --- a/warehouse/config.py +++ b/warehouse/config.py @@ -522,6 +522,19 @@ def configure(settings=None): "GLOBAL_LOGIN_RATELIMIT_STRING", default="1000 per 5 minutes", ) + # Separate rate limiters for 2FA attempts to prevent brute-force attacks + maybe_set( + settings, + "warehouse.account.2fa_user_ratelimit_string", + "2FA_USER_RATELIMIT_STRING", + default="5 per 5 minutes, 20 per hour, 50 per day", + ) + maybe_set( + settings, + "warehouse.account.2fa_ip_ratelimit_string", + "2FA_IP_RATELIMIT_STRING", + default="10 per 5 minutes, 50 per hour", + ) maybe_set( settings, "warehouse.account.email_add_ratelimit_string",