fix(api): rate-limit magic-code verify, bound per-token attempts (GHSA-9pvm-fcf6-9234) (#9130)

* fix(api): rate-limit magic-code verification and bound per-token attempts

The magic-link sign-in / sign-up endpoints accept a 6-digit numeric code
(900k-value space, 600s TTL) but never increment a failure counter on a
wrong-code verify and extend django.views.View rather than DRF APIView,
so DRF's AuthenticationThrottle never runs against them. The space-side
generate endpoint also lacked throttle_classes. Combined, this allowed
an unauthenticated attacker who knew a victim's email to brute-force
the code within the TTL window and log in as the victim.

- Add MAX_VERIFY_ATTEMPTS=5 in MagicCodeProvider.set_user_data: failed
  comparisons now persist verify_attempts in Redis under the remaining
  TTL and, on hitting the limit, delete the key and raise
  EMAIL_CODE_ATTEMPT_EXHAUSTED. This is the load-bearing fix - it caps
  total attempts per issued token regardless of request rate.
- Add authentication_throttle_allows() so plain Django Views can apply
  AuthenticationThrottle without converting to APIView (would change
  CSRF + request-parsing semantics for the redirect-flow endpoints).
- Apply the throttle to MagicSignIn/UpEndpoint and the space variants;
  add throttle_classes to MagicGenerateSpaceEndpoint to match its app
  sibling.

Refs GHSA-9pvm-fcf6-9234.

* fix(api): make verify-attempt increment atomic, expose throttle rate via env

Address PR review feedback:

- Replace the JSON read-modify-write of verify_attempts with a Lua
  EVAL script that INCRs a dedicated counter key and EXPIREs it only
  on the first increment. The previous round-trip was racy: parallel
  wrong-code requests could read the same value and both write the
  same incremented count, letting an attacker exceed MAX_VERIFY_ATTEMPTS
  under concurrency. Counter is now reset on each new token issuance
  and cleared on successful verify / exhaustion.
- Make AuthenticationThrottle.rate configurable via the
  AUTHENTICATION_RATE_LIMIT env var (default 10/minute, down from 30
  to tighten the budget on unauth auth-adjacent endpoints). Document
  it in deployments/aio and deployments/cli variables.env.

* test(api): cover magic-code attempt cap, counter reset, and auth throttle

Add the contract tests called out in the PR test plan:

- TestMagicSignInVerifyAttempts:
  - test_exhausted_after_max_wrong_attempts: after MAX_VERIFY_ATTEMPTS
    wrong codes the next verify redirects with EMAIL_CODE_ATTEMPT_
    EXHAUSTED_SIGN_IN and both Redis keys are deleted; a follow-up
    verify reports EXPIRED.
  - test_counter_increments_on_each_wrong_attempt: the dedicated
    verify_attempts counter advances by exactly one per wrong POST,
    matching the atomic Lua INCR.
  - test_counter_resets_on_token_regeneration: regenerating the
    magic-link clears the counter so the user isn't pre-locked-out by
    a prior session's wrong attempts.
- TestMagicSignUpVerifyAttempts.test_signup_exhausted_after_max_wrong_attempts:
  the sign-up endpoint returns EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_UP on
  the exhausting attempt.
- TestAuthenticationThrottle: exercises authentication_throttle_allows
  on the plain-View redirect-flow endpoints by patching the rate down
  and asserting RATE_LIMIT_EXCEEDED is appended to the redirect URL
  once the per-IP budget is exceeded, for both magic-sign-in and
  magic-sign-up.

Each new class clears Django cache (DRF throttle storage) and the
per-email Redis keys around every test so runs are independent.

* fix(api): clamp remaining_ttl to >=1 for verify-attempt counter EXPIRE

ri.ttl() returns 0 when the token has less than one second remaining
(Redis floors to whole seconds). The previous clamp only caught
None and < 0, so a sub-second TTL would pass through and the Lua
script's EXPIRE counter 0 would immediately delete the key — letting
an attacker bypass MAX_VERIFY_ATTEMPTS during the final second of the
token's life. Switch the comparison to <= 0.

Narrow real-world impact (sub-second window, throttle still bounds
the rate) but the cap should hold regardless of timing.
This commit is contained in:
sriram veeraghanta
2026-06-01 18:44:57 +05:30
committed by GitHub
parent 7ec8d4990f
commit b1c78fe4c8
7 changed files with 354 additions and 4 deletions
@@ -22,6 +22,27 @@ from plane.db.models import User
class MagicCodeProvider(CredentialAdapter): class MagicCodeProvider(CredentialAdapter):
provider = "magic-code" provider = "magic-code"
# Max wrong-code verification attempts per issued token before the token
# is invalidated. Prevents brute-forcing the 6-digit code space within
# the token TTL window.
MAX_VERIFY_ATTEMPTS = 5
# Atomic INCR + first-time EXPIRE for the verify-attempt counter.
# Using a dedicated counter key with this script makes the increment
# safe under concurrent wrong-code requests; a plain JSON read/modify/
# write would race and let parallel attackers exceed the cap.
_INCREMENT_VERIFY_ATTEMPTS_SCRIPT = (
'local count = redis.call("INCR", KEYS[1]) '
'if count == 1 then '
' redis.call("EXPIRE", KEYS[1], tonumber(ARGV[1])) '
'end '
'return count'
)
@staticmethod
def _verify_attempts_key(token_key):
return f"{token_key}:verify_attempts"
def __init__(self, request, key, code=None, callback=None): def __init__(self, request, key, code=None, callback=None):
(EMAIL_HOST, ENABLE_MAGIC_LINK_LOGIN) = get_configuration_value( (EMAIL_HOST, ENABLE_MAGIC_LINK_LOGIN) = get_configuration_value(
[ [
@@ -92,6 +113,9 @@ class MagicCodeProvider(CredentialAdapter):
expiry = 600 expiry = 600
ri.set(key, json.dumps(value), ex=expiry) ri.set(key, json.dumps(value), ex=expiry)
# Reset the verify-attempt counter so each newly issued token starts
# with a fresh budget of MAX_VERIFY_ATTEMPTS.
ri.delete(self._verify_attempts_key(key))
return key, token return key, token
def set_user_data(self): def set_user_data(self):
@@ -114,12 +138,52 @@ class MagicCodeProvider(CredentialAdapter):
}, },
} }
) )
# Delete the token from redis if the code match is successful # Delete the token and its counter from redis on success.
ri.delete(self.key) ri.delete(self.key)
ri.delete(self._verify_attempts_key(self.key))
return return
else: else:
email = str(self.key).replace("magic_", "", 1) email = str(self.key).replace("magic_", "", 1)
if User.objects.filter(email=email).exists(): user_exists = User.objects.filter(email=email).exists()
# Atomically increment the verify-attempt counter in Redis.
# The Lua script sets the TTL only on the first increment so
# the lockout window matches the remaining token TTL and does
# not get extended by every wrong-code attempt.
# ri.ttl() returns -2 (missing), -1 (no expiry), 0 (sub-second
# remaining; Redis floors to whole seconds), or a positive int.
# Clamp to >=1 because EXPIRE key 0 immediately deletes the key
# and would let an attacker bypass the cap in the final second.
remaining_ttl = ri.ttl(self.key)
if remaining_ttl is None or remaining_ttl <= 0:
remaining_ttl = 1
verify_attempts = int(
ri.eval(
self._INCREMENT_VERIFY_ATTEMPTS_SCRIPT,
1,
self._verify_attempts_key(self.key),
remaining_ttl,
)
)
if verify_attempts >= self.MAX_VERIFY_ATTEMPTS:
# Invalidate the token (and counter) so further attempts
# must regenerate; regeneration is itself attempt-counted.
ri.delete(self.key)
ri.delete(self._verify_attempts_key(self.key))
if user_exists:
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_IN"],
error_message="EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_IN",
payload={"email": str(email)},
)
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_UP"],
error_message="EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_UP",
payload={"email": str(email)},
)
if user_exists:
raise AuthenticationException( raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["INVALID_MAGIC_CODE_SIGN_IN"], error_code=AUTHENTICATION_ERROR_CODES["INVALID_MAGIC_CODE_SIGN_IN"],
error_message="INVALID_MAGIC_CODE_SIGN_IN", error_message="INVALID_MAGIC_CODE_SIGN_IN",
+22 -1
View File
@@ -2,6 +2,9 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details. # See the LICENSE file for details.
# Python imports
import os
# Third party imports # Third party imports
from rest_framework.throttling import AnonRateThrottle, UserRateThrottle from rest_framework.throttling import AnonRateThrottle, UserRateThrottle
from rest_framework import status from rest_framework import status
@@ -15,7 +18,9 @@ from plane.authentication.adapter.error import (
class AuthenticationThrottle(AnonRateThrottle): class AuthenticationThrottle(AnonRateThrottle):
rate = "30/minute" # Rate is configurable per-deployment via the AUTHENTICATION_RATE_LIMIT
# env var (DRF format: "<num>/<period>" where period is second/minute/hour/day).
rate = os.environ.get("AUTHENTICATION_RATE_LIMIT", "10/minute")
scope = "authentication" scope = "authentication"
def throttle_failure_view(self, request, *args, **kwargs): def throttle_failure_view(self, request, *args, **kwargs):
@@ -28,6 +33,22 @@ class AuthenticationThrottle(AnonRateThrottle):
return Response(e.get_error_dict(), status=status.HTTP_429_TOO_MANY_REQUESTS) return Response(e.get_error_dict(), status=status.HTTP_429_TOO_MANY_REQUESTS)
def authentication_throttle_allows(request):
"""
Apply AuthenticationThrottle to a plain django.views.View request.
DRF's throttle_classes only run inside APIView.initial(); the magic
sign-in / sign-up endpoints extend django.views.View to return
HttpResponseRedirect from a form POST flow, so they need a manual
throttle check. Returns True if the request is allowed through,
False if it should be rejected with a RATE_LIMIT_EXCEEDED error.
"""
throttle = AuthenticationThrottle()
# SimpleRateThrottle.allow_request only reads request.META and
# request.user, both available on a plain Django HttpRequest.
return throttle.allow_request(request, None)
class EmailVerificationThrottle(UserRateThrottle): class EmailVerificationThrottle(UserRateThrottle):
""" """
Throttle for email verification code generation. Throttle for email verification code generation.
@@ -26,7 +26,10 @@ from plane.authentication.adapter.error import (
AuthenticationException, AuthenticationException,
AUTHENTICATION_ERROR_CODES, AUTHENTICATION_ERROR_CODES,
) )
from plane.authentication.rate_limit import AuthenticationThrottle from plane.authentication.rate_limit import (
AuthenticationThrottle,
authentication_throttle_allows,
)
from plane.utils.path_validator import get_safe_redirect_url from plane.utils.path_validator import get_safe_redirect_url
@@ -65,6 +68,18 @@ class MagicSignInEndpoint(View):
email = request.POST.get("email", "").strip().lower() email = request.POST.get("email", "").strip().lower()
next_path = request.POST.get("next_path") next_path = request.POST.get("next_path")
if not authentication_throttle_allows(request):
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["RATE_LIMIT_EXCEEDED"],
error_message="RATE_LIMIT_EXCEEDED",
)
url = get_safe_redirect_url(
base_url=base_host(request=request, is_app=True),
next_path=next_path,
params=exc.get_error_dict(),
)
return HttpResponseRedirect(url)
if code == "" or email == "": if code == "" or email == "":
exc = AuthenticationException( exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_IN_EMAIL_CODE_REQUIRED"], error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_IN_EMAIL_CODE_REQUIRED"],
@@ -136,6 +151,18 @@ class MagicSignUpEndpoint(View):
email = request.POST.get("email", "").strip().lower() email = request.POST.get("email", "").strip().lower()
next_path = request.POST.get("next_path") next_path = request.POST.get("next_path")
if not authentication_throttle_allows(request):
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["RATE_LIMIT_EXCEEDED"],
error_message="RATE_LIMIT_EXCEEDED",
)
url = get_safe_redirect_url(
base_url=base_host(request=request, is_app=True),
next_path=next_path,
params=exc.get_error_dict(),
)
return HttpResponseRedirect(url)
if code == "" or email == "": if code == "" or email == "":
exc = AuthenticationException( exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_UP_EMAIL_CODE_REQUIRED"], error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_UP_EMAIL_CODE_REQUIRED"],
@@ -25,12 +25,18 @@ from plane.authentication.adapter.error import (
AuthenticationException, AuthenticationException,
AUTHENTICATION_ERROR_CODES, AUTHENTICATION_ERROR_CODES,
) )
from plane.authentication.rate_limit import (
AuthenticationThrottle,
authentication_throttle_allows,
)
from plane.utils.path_validator import get_safe_redirect_url, validate_next_path, get_allowed_hosts from plane.utils.path_validator import get_safe_redirect_url, validate_next_path, get_allowed_hosts
class MagicGenerateSpaceEndpoint(APIView): class MagicGenerateSpaceEndpoint(APIView):
permission_classes = [AllowAny] permission_classes = [AllowAny]
throttle_classes = [AuthenticationThrottle]
def post(self, request): def post(self, request):
# Check if instance is configured # Check if instance is configured
instance = Instance.objects.first() instance = Instance.objects.first()
@@ -60,6 +66,18 @@ class MagicSignInSpaceEndpoint(View):
email = request.POST.get("email", "").strip().lower() email = request.POST.get("email", "").strip().lower()
next_path = request.POST.get("next_path") next_path = request.POST.get("next_path")
if not authentication_throttle_allows(request):
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["RATE_LIMIT_EXCEEDED"],
error_message="RATE_LIMIT_EXCEEDED",
)
url = get_safe_redirect_url(
base_url=base_host(request=request, is_space=True),
next_path=next_path,
params=exc.get_error_dict(),
)
return HttpResponseRedirect(url)
if code == "" or email == "": if code == "" or email == "":
exc = AuthenticationException( exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_IN_EMAIL_CODE_REQUIRED"], error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_IN_EMAIL_CODE_REQUIRED"],
@@ -119,6 +137,18 @@ class MagicSignUpSpaceEndpoint(View):
email = request.POST.get("email", "").strip().lower() email = request.POST.get("email", "").strip().lower()
next_path = request.POST.get("next_path") next_path = request.POST.get("next_path")
if not authentication_throttle_allows(request):
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["RATE_LIMIT_EXCEEDED"],
error_message="RATE_LIMIT_EXCEEDED",
)
url = get_safe_redirect_url(
base_url=base_host(request=request, is_space=True),
next_path=next_path,
params=exc.get_error_dict(),
)
return HttpResponseRedirect(url)
if code == "" or email == "": if code == "" or email == "":
exc = AuthenticationException( exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_UP_EMAIL_CODE_REQUIRED"], error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_UP_EMAIL_CODE_REQUIRED"],
@@ -5,6 +5,7 @@
import json import json
import uuid import uuid
import pytest import pytest
from django.core.cache import cache
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
from rest_framework import status from rest_framework import status
@@ -12,6 +13,8 @@ from django.test import Client
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from unittest.mock import patch from unittest.mock import patch
from plane.authentication.provider.credentials.magic_code import MagicCodeProvider
from plane.authentication.rate_limit import AuthenticationThrottle
from plane.db.models import User from plane.db.models import User
from plane.settings.redis import redis_instance from plane.settings.redis import redis_instance
from plane.license.models import Instance from plane.license.models import Instance
@@ -428,3 +431,198 @@ class TestMagicSignUp:
# Check if user is authenticated # Check if user is authenticated
assert "_auth_user_id" in django_client.session assert "_auth_user_id" in django_client.session
def _generate_magic_token(api_client, email):
"""Hit /magic-generate/ for `email` and return the token that landed in Redis."""
gen_url = reverse("magic-generate")
response = api_client.post(gen_url, {"email": email}, format="json")
assert response.status_code == status.HTTP_200_OK
ri = redis_instance()
return json.loads(ri.get(f"magic_{email}"))["token"]
@pytest.mark.contract
class TestMagicSignInVerifyAttempts:
"""Per-token wrong-code attempt counter and exhaustion behavior (GHSA-9pvm-fcf6-9234)."""
EMAIL = "verify-attempts@plane.so"
@pytest.fixture
def setup_user(self, db):
user = User.objects.create(email=self.EMAIL)
user.set_password("user@123")
user.save()
return user
@pytest.fixture(autouse=True)
def _clear_state(self):
"""Reset throttle cache and magic-link redis state between tests in this class."""
cache.clear()
ri = redis_instance()
ri.delete(f"magic_{self.EMAIL}")
ri.delete(f"magic_{self.EMAIL}:verify_attempts")
yield
cache.clear()
ri.delete(f"magic_{self.EMAIL}")
ri.delete(f"magic_{self.EMAIL}:verify_attempts")
@pytest.mark.django_db
@patch("plane.bgtasks.magic_link_code_task.magic_link.delay")
def test_exhausted_after_max_wrong_attempts(
self, mock_magic_link, django_client, api_client, setup_user, setup_instance
):
"""
After MAX_VERIFY_ATTEMPTS wrong codes the next verify must redirect with
EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_IN and both Redis keys must be gone.
With MAX_VERIFY_ATTEMPTS=5 the 5th wrong attempt itself triggers exhaustion
(4 INVALID + 1 EXHAUSTED), matching the >= check in set_user_data.
"""
_generate_magic_token(api_client, self.EMAIL)
url = reverse("magic-sign-in")
ri = redis_instance()
# First (MAX-1) wrong attempts: each redirects with INVALID_MAGIC_CODE_SIGN_IN.
for i in range(MagicCodeProvider.MAX_VERIFY_ATTEMPTS - 1):
response = django_client.post(url, {"email": self.EMAIL, "code": "000000"}, follow=False)
assert response.status_code == 302, f"attempt {i+1} unexpected status"
assert "INVALID_MAGIC_CODE_SIGN_IN" in response.url, f"attempt {i+1} did not return INVALID"
# Token and counter both still live, with counter at MAX-1.
assert ri.exists(f"magic_{self.EMAIL}")
assert int(ri.get(f"magic_{self.EMAIL}:verify_attempts")) == MagicCodeProvider.MAX_VERIFY_ATTEMPTS - 1
# The MAX-th wrong attempt is the exhausting one.
response = django_client.post(url, {"email": self.EMAIL, "code": "000000"}, follow=False)
assert response.status_code == 302
assert "EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_IN" in response.url
# Both the token and the counter must be deleted.
assert not ri.exists(f"magic_{self.EMAIL}")
assert not ri.exists(f"magic_{self.EMAIL}:verify_attempts")
# Follow-up verify now sees the key as missing and reports EXPIRED.
response = django_client.post(url, {"email": self.EMAIL, "code": "000000"}, follow=False)
assert response.status_code == 302
assert "EXPIRED_MAGIC_CODE_SIGN_IN" in response.url
@pytest.mark.django_db
@patch("plane.bgtasks.magic_link_code_task.magic_link.delay")
def test_counter_increments_on_each_wrong_attempt(
self, mock_magic_link, django_client, api_client, setup_user, setup_instance
):
"""The verify_attempts counter increments by exactly one per wrong-code POST."""
_generate_magic_token(api_client, self.EMAIL)
url = reverse("magic-sign-in")
ri = redis_instance()
counter_key = f"magic_{self.EMAIL}:verify_attempts"
# Before any wrong attempt the counter does not exist (Lua INCR creates it).
assert not ri.exists(counter_key)
for expected in range(1, MagicCodeProvider.MAX_VERIFY_ATTEMPTS):
django_client.post(url, {"email": self.EMAIL, "code": "000000"}, follow=False)
assert int(ri.get(counter_key)) == expected, f"counter mismatch after {expected} attempts"
@pytest.mark.django_db
@patch("plane.bgtasks.magic_link_code_task.magic_link.delay")
def test_counter_resets_on_token_regeneration(
self, mock_magic_link, django_client, api_client, setup_user, setup_instance
):
"""
Regenerating the magic-link must reset the verify-attempt counter so the
user isn't pre-locked-out by a previous session's wrong attempts.
"""
_generate_magic_token(api_client, self.EMAIL)
url = reverse("magic-sign-in")
ri = redis_instance()
counter_key = f"magic_{self.EMAIL}:verify_attempts"
for _ in range(MagicCodeProvider.MAX_VERIFY_ATTEMPTS - 2):
django_client.post(url, {"email": self.EMAIL, "code": "000000"}, follow=False)
assert int(ri.get(counter_key)) == MagicCodeProvider.MAX_VERIFY_ATTEMPTS - 2
# Regenerate the magic-link — the counter should be cleared.
_generate_magic_token(api_client, self.EMAIL)
assert not ri.exists(counter_key)
# Fresh wrong attempt now produces INVALID (not EXHAUSTED) and counter starts at 1.
response = django_client.post(url, {"email": self.EMAIL, "code": "000000"}, follow=False)
assert "INVALID_MAGIC_CODE_SIGN_IN" in response.url
assert int(ri.get(counter_key)) == 1
@pytest.mark.contract
class TestMagicSignUpVerifyAttempts:
"""Sign-up flow gets the same per-token attempt cap (no existing User row)."""
EMAIL = "signup-verify-attempts@plane.so"
@pytest.fixture(autouse=True)
def _clear_state(self):
cache.clear()
ri = redis_instance()
ri.delete(f"magic_{self.EMAIL}")
ri.delete(f"magic_{self.EMAIL}:verify_attempts")
yield
cache.clear()
ri.delete(f"magic_{self.EMAIL}")
ri.delete(f"magic_{self.EMAIL}:verify_attempts")
@pytest.mark.django_db
@patch("plane.bgtasks.magic_link_code_task.magic_link.delay")
def test_signup_exhausted_after_max_wrong_attempts(
self, mock_magic_link, django_client, api_client, setup_instance
):
"""The MAX-th wrong code on the sign-up endpoint returns the SIGN_UP variant of EXHAUSTED."""
_generate_magic_token(api_client, self.EMAIL)
url = reverse("magic-sign-up")
ri = redis_instance()
for _ in range(MagicCodeProvider.MAX_VERIFY_ATTEMPTS - 1):
response = django_client.post(url, {"email": self.EMAIL, "code": "000000"}, follow=False)
assert "INVALID_MAGIC_CODE_SIGN_UP" in response.url
response = django_client.post(url, {"email": self.EMAIL, "code": "000000"}, follow=False)
assert "EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_UP" in response.url
assert not ri.exists(f"magic_{self.EMAIL}")
assert not ri.exists(f"magic_{self.EMAIL}:verify_attempts")
@pytest.mark.contract
class TestAuthenticationThrottle:
"""Per-IP throttle on the redirect-flow magic-link endpoints."""
@pytest.fixture(autouse=True)
def _clear_state(self):
cache.clear()
yield
cache.clear()
@pytest.mark.django_db
def test_magic_sign_in_throttled(self, django_client, setup_instance):
"""Posting past the configured rate from one IP returns RATE_LIMIT_EXCEEDED."""
url = reverse("magic-sign-in")
# Drop the rate so the test doesn't have to fire 10+ requests.
with patch.object(AuthenticationThrottle, "rate", "2/minute"):
for _ in range(2):
response = django_client.post(url, {"email": "throttle@plane.so", "code": "000000"}, follow=False)
assert response.status_code == 302
assert "RATE_LIMIT_EXCEEDED" not in response.url
# The 3rd request from the same IP within the window trips the throttle.
response = django_client.post(url, {"email": "throttle@plane.so", "code": "000000"}, follow=False)
assert response.status_code == 302
assert "RATE_LIMIT_EXCEEDED" in response.url
@pytest.mark.django_db
def test_magic_sign_up_throttled(self, django_client, setup_instance):
"""The sign-up sibling shares the same scope and trips on the same per-IP budget."""
url = reverse("magic-sign-up")
with patch.object(AuthenticationThrottle, "rate", "1/minute"):
response = django_client.post(url, {"email": "throttle-up@plane.so", "code": "000000"}, follow=False)
assert "RATE_LIMIT_EXCEEDED" not in response.url
response = django_client.post(url, {"email": "throttle-up@plane.so", "code": "000000"}, follow=False)
assert "RATE_LIMIT_EXCEEDED" in response.url
+5
View File
@@ -49,6 +49,11 @@ MINIO_ENDPOINT_SSL=0
# API key rate limit # API key rate limit
API_KEY_RATE_LIMIT=60/minute API_KEY_RATE_LIMIT=60/minute
# Per-IP throttle for anonymous authentication endpoints (magic-link
# generate / sign-in / sign-up, email sign-in). DRF format: "<n>/<period>"
# where period is second/minute/hour/day.
AUTHENTICATION_RATE_LIMIT=10/minute
# Live Server Secret Key # Live Server Secret Key
LIVE_SERVER_SECRET_KEY=htbqvBJAgpm9bzvf3r4urJer0ENReatceh LIVE_SERVER_SECRET_KEY=htbqvBJAgpm9bzvf3r4urJer0ENReatceh
+5
View File
@@ -77,6 +77,11 @@ MINIO_ENDPOINT_SSL=0
# API key rate limit # API key rate limit
API_KEY_RATE_LIMIT=60/minute API_KEY_RATE_LIMIT=60/minute
# Per-IP throttle for anonymous authentication endpoints (magic-link
# generate / sign-in / sign-up, email sign-in). DRF format: "<n>/<period>"
# where period is second/minute/hour/day.
AUTHENTICATION_RATE_LIMIT=10/minute
# Live server environment variables # Live server environment variables
# WARNING: You must set a secure value for LIVE_SERVER_SECRET_KEY in production environments. # WARNING: You must set a secure value for LIVE_SERVER_SECRET_KEY in production environments.
LIVE_SERVER_SECRET_KEY= LIVE_SERVER_SECRET_KEY=