diff --git a/accounts/templates/accounts/auth/sudo.html b/accounts/templates/accounts/auth/sudo.html new file mode 100644 index 0000000..eadafe4 --- /dev/null +++ b/accounts/templates/accounts/auth/sudo.html @@ -0,0 +1,33 @@ +{% extends "common/components/base.html" %} +{% block title %} Confirm Access | Hostea Dashbaord{% endblock %} +{% block nav %} {% include "dash/common/components/primary-nav.html" %} {% endblock %} + +{% block main %} +
+

{{ Title }}

+
+ {% include "common/components/error.html" %} {% csrf_token %} +

Please login to confirm access

+ + + +
+ +
+
+
+{% endblock %} diff --git a/accounts/tests.py b/accounts/tests.py index a8e1ca2..06f44c7 100644 --- a/accounts/tests.py +++ b/accounts/tests.py @@ -19,6 +19,7 @@ import time from django.contrib.auth import get_user_model from django.urls import reverse from django.test import TestCase, Client, override_settings +from django.utils.http import urlencode from django.contrib.auth import authenticate from django.conf import settings @@ -26,6 +27,8 @@ from .models import AccountConfirmChallenge from .management.commands.rm_unverified_users import ( Command as CleanUnverifiedUsersCommand, ) +from .utils import ConfirmAccess +from .decorators import confirm_access def register_util(t: TestCase, username: str): @@ -245,3 +248,131 @@ class UnverifiedAccountCleanupTets(TestCase): self.assertEqual( get_user_model().objects.filter(username=username2).exists(), False ) + + +class SudoWorks(TestCase): + def setUp(self): + self.username = "sudo_useworks" + register_util(t=self, username=self.username) + + def test_sudo_renders(self): + c = Client() + + resp = c.get(reverse("accounts.sudo")) + self.assertEqual(resp.status_code, 302) + self.assertEqual( + resp.headers["location"], + f"/accounts/login/?next={reverse('accounts.sudo')}", + ) + + login_util(t=self, c=c, redirect_to="accounts.home") + + # GET sudo page + ctx = {"next": reverse("accounts.home")} + sudo_path = f"{reverse('accounts.sudo')}?{urlencode(ctx)}" + resp = c.get(sudo_path) + self.assertEqual(resp.status_code, 200) + self.assertEqual(b"Please login to confirm access" in resp.content, True) + + # Success sudo validation + payload = {"password": self.password, "next": ctx["next"]} + resp = c.post(reverse("accounts.sudo"), payload) + self.assertEqual(resp.status_code, 302) + self.assertEqual(resp.headers["location"], ctx["next"]) + + # Fail sudo validation + payload["password"] = self.username + resp = c.post(reverse("accounts.sudo"), payload) + self.assertEqual(resp.status_code, 401) + self.assertEqual(b"Wrong Password" in resp.content, True) + + +max_sudo_ttl = 5 + + +class MockRequest: + def __init__(self, path, session={}): + self.path = path + self.session = session + + +class ConfirmAccessDecorator(TestCase): + + # TODO: override to test TTL + def test_redirect_to_sudo(self): + request = MockRequest(path="/") + resp = ConfirmAccess.redirect_to_sudo(request) + self.assertEqual(resp.status_code, 302) + ctx = {"next": request.path} + self.assertEqual( + resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}" + ) + + @override_settings(HOSTEA={"ACCOUNTS": {"SUDO_TTL": max_sudo_ttl}}) + def test_is_valid(self): + request = MockRequest(path="/") + + # request doesn't have sudo authorization data + self.assertEqual(ConfirmAccess.is_valid(request), False) + + # authorize sudo + ConfirmAccess.set(request) + + # request has sudo authorization data and is valid for this time duration + self.assertEqual(ConfirmAccess.is_valid(request), True) + + time.sleep(settings.HOSTEA["ACCOUNTS"]["SUDO_TTL"] + 2) + + # request has sudo authorization data and is not valid for this time duration + self.assertEqual(ConfirmAccess.is_valid(request), False) + + def test_validate_decorator_cls_method(self): + req = MockRequest(path="/") + req.session = {} + + def fn(req, *args, **kwargs): + return True + + args = {} + kwargs = {} + + # request doesn't have sudo authorization data and is not valid for this time duration + resp = ConfirmAccess.validate_decorator(req, fn, *args, **kwargs) + self.assertEqual(resp.status_code, 302) + + ctx = {"next": req.path} + self.assertEqual( + resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}" + ) + + # authorize sudo + ConfirmAccess.set(req) + # request has sudo authorization data and is valid for this time duration + self.assertEqual( + ConfirmAccess.validate_decorator(req, fn, *args, **kwargs), True + ) + + def test_validate_decorator(self): + req = MockRequest(path="/") + req.session = {} + + @confirm_access + def fn(req): + return True + + args = {} + kwargs = {} + + # request doesn't have sudo authorization data and is not valid for this time duration + resp = fn(req) + self.assertEqual(resp.status_code, 302) + + ctx = {"next": req.path} + self.assertEqual( + resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}" + ) + + # authorize sudo + ConfirmAccess.set(req) + # request has sudo authorization data and is valid for this time duration + self.assertEqual(fn(req), True) diff --git a/accounts/urls.py b/accounts/urls.py index 97b6971..4249dde 100644 --- a/accounts/urls.py +++ b/accounts/urls.py @@ -24,6 +24,7 @@ from .views import ( verify_account, resend_verification_email_view, verification_pending_view, + sudo, ) urlpatterns = [ @@ -42,5 +43,6 @@ urlpatterns = [ name="accounts.verify.resend", ), path("accounts/verify//", verify_account, name="accounts.verify"), + path("accounts/sudo/", sudo, name="accounts.sudo"), path("", protected_view, name="accounts.home"), ] diff --git a/accounts/views.py b/accounts/views.py index 97f87de..3c2ef2b 100644 --- a/accounts/views.py +++ b/accounts/views.py @@ -23,7 +23,7 @@ from django.urls import reverse from .models import AccountConfirmChallenge -from .utils import send_verification_email +from .utils import send_verification_email, ConfirmAccess @csrf_protect @@ -185,3 +185,36 @@ def verify_account(request, challenge): challenge.owned_by.save() challenge.delete() return redirect("accounts.login") + + +@login_required +@csrf_protect +def sudo(request): + def default_login_ctx(): + return { + "title": "Confirm Access", + } + + if request.method == "GET": + ctx = default_login_ctx() + ctx["next"] = request.GET["next"] + return render(request, "accounts/auth/sudo.html", ctx) + + password = request.POST["password"] + user = request.user + + user = authenticate( + username=user.username, + password=request.POST["password"], + ) + if user is None: + ctx = default_login_ctx() + ctx["next"] = request.POST["next"] + ctx["error"] = { + "title": "Wrong Password", + "reason": "Password is incorrect, please try again.", + } + return render(request, "accounts/auth/sudo.html", status=401, context=ctx) + + ConfirmAccess.set(request=request) + return redirect(request.POST["next"])