# Create your tests here. # Copyright © 2022 Aravinth Manivannan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import time from io import StringIO from django.contrib.auth import get_user_model from django.core.management import call_command from django.urls import reverse from django.test import TestCase, Client, override_settings from django.utils.http import urlencode from django.contrib.auth import authenticate from django.conf import settings from .models import AccountConfirmChallenge from .management.commands.rm_unverified_users import ( Command as CleanUnverifiedUsersCommand, ) from .utils import ConfirmAccess from .decorators import confirm_access def register_util(t: TestCase, username: str): t.password = "password121231" t.username = username t.email = f"{t.username}@example.org" t.user = get_user_model().objects.create( username=t.username, email=t.email, ) t.user.set_password(t.password) t.user.save() def login_util(t: TestCase, c: Client, redirect_to: str): payload = { "login": t.username, "password": t.password, } resp = c.post(reverse("accounts.login"), payload) t.assertEqual(resp.status_code, 302) t.assertEqual(resp.headers["location"], reverse(redirect_to)) class LoginTest(TestCase): """ Tests create new app view """ def setUp(self): self.username = "create_new_app_tests" register_util(t=self, username=self.username) def test_login_template_works(self): """ Tests if login template renders """ resp = self.client.get(reverse("accounts.login")) self.assertEqual(b"Free Forge Ecosystem" in resp.content, True) def test_login_works(self): """ Tests if login template renders """ c = Client() # username login works login_util(t=self, c=c, redirect_to="accounts.home") # email login works payload = { "login": self.email, "password": self.password, } resp = c.post(reverse("accounts.login"), payload) self.assertEqual(resp.status_code, 302) self.assertEqual(resp.headers["location"], reverse("accounts.home")) # authentication failure when wrong credentials are provided payload = { "login": self.email, "password": self.user.email, } resp = self.client.post(reverse("accounts.login"), payload) self.assertEqual(resp.status_code, 401) self.assertEqual(b"Login Failed" in resp.content, True) # protected view works resp = c.get(reverse("accounts.home")) self.assertEqual(resp.status_code, 302) self.assertEqual(resp.headers["location"], reverse("dash.home")) def test_logout_works(self): """ Logout view tests """ c = Client() login_util(t=self, c=c, redirect_to="accounts.home") resp = c.get(reverse("accounts.logout")) self.assertEqual(resp.status_code, 302) self.assertEqual(resp.headers["location"], reverse("accounts.login")) def test_default_login_uri_works(self): """ /accounts/login should redirect_to /login """ c = Client() resp = c.get(reverse("accounts.default_login_url")) self.assertEqual(resp.status_code, 302) self.assertEqual(resp.headers["location"], reverse("accounts.login")) class RegistrationTest(TestCase): def test_register_template_works(self): """ Tests if register template renders """ resp = self.client.get(reverse("accounts.register")) self.assertEqual(b"Free Forge Ecosystem" in resp.content, True) def test_register_works(self): """ Tests if register works """ c = Client() # passwords don't match msg = { "username": "register_user", "password": "password", "email": "register_user@example.com", "confirm_password": "foo@example.com", } resp = c.post(reverse("accounts.register"), msg) self.assertEqual(resp.status_code, 400) # register user msg["confirm_password"] = msg["password"] resp = c.post(reverse("accounts.register"), msg) self.assertEqual(resp.status_code, 302) user = get_user_model().objects.get(username=msg["username"]) self.assertEqual(user.is_active, False) challenge = AccountConfirmChallenge.objects.get(owned_by=user) pending_url = challenge.pending_url() self.assertEqual(resp.headers["location"], pending_url) resend_url = reverse("accounts.verify.resend", args=(challenge.public_ref,)) # visit pending URL resp = c.get(pending_url) self.assertEqual(resp.status_code, 200) self.assertEqual(str.encode(msg["email"]) in resp.content, True) self.assertEqual(str.encode(resend_url) in resp.content, True) resp = c.post(resend_url) self.assertEqual(resp.status_code, 302) self.assertEqual(resp.headers["location"], pending_url) resp = c.get(challenge.verification_link()) self.assertEqual(resp.status_code, 200) self.assertEqual( str.encode(challenge.verification_link()) in resp.content, True ) resp = c.post(challenge.verification_link()) self.assertEqual(resp.status_code, 302) self.assertEqual(resp.headers["location"], reverse("accounts.login")) user.refresh_from_db() self.assertEqual(user.is_active, True) old_email = msg["email"] # duplicate username msg["email"] = "new_register_user_email@example.com" resp = c.post(reverse("accounts.register"), msg) self.assertEqual(resp.status_code, 400) self.assertEqual(b"Username is already registered" in resp.content, True) # duplicate email msg["username"] = "new_register_user_email" msg["email"] = old_email resp = c.post(reverse("accounts.register"), msg) self.assertEqual(resp.status_code, 400) self.assertEqual(b"Email is already registered" in resp.content, True) class UnverifiedAccountCleanupTets(TestCase): """ Tests account clean up management command """ @override_settings(HOSTEA={"ACCOUNTS": {"MAX_VERIFICATION_TOLERANCE_PERIOD": 0}}) def test_register_works(self): """ Tests if register works """ c = Client() username1 = "cleanup_user1" username2 = "cleanup_user2" # passwords don't match msg = { "username": username1, "password": "password", "email": f"{username1}@example.com", "confirm_password": "password", } # register user resp = c.post(reverse("accounts.register"), msg) self.assertEqual(resp.status_code, 302) msg["username"] = username2 msg["email"] = f"{username2}@example.org" resp = c.post(reverse("accounts.register"), msg) self.assertEqual(resp.status_code, 302) user1 = get_user_model().objects.get(username=username1) user1.is_active = True user1.save() if settings.HOSTEA["ACCOUNTS"]["MAX_VERIFICATION_TOLERANCE_PERIOD"] > 10: raise Exception( "This test requires MAX_VERIFICATION_TOLERANCE_PERIOD to be less, ideally less 10" ) time.sleep(settings.HOSTEA["ACCOUNTS"]["MAX_VERIFICATION_TOLERANCE_PERIOD"] + 2) cmd = CleanUnverifiedUsersCommand() cmd.handle() self.assertEqual( get_user_model().objects.filter(username=username1).exists(), True ) self.assertEqual( get_user_model().objects.filter(username=username2).exists(), False ) class SudoWorks(TestCase): def setUp(self): self.username = "sudo_useworks" register_util(t=self, username=self.username) def test_sudo_renders(self): c = Client() resp = c.get(reverse("accounts.sudo")) self.assertEqual(resp.status_code, 302) self.assertEqual( resp.headers["location"], f"/accounts/login/?next={reverse('accounts.sudo')}", ) login_util(t=self, c=c, redirect_to="accounts.home") # GET sudo page ctx = {"next": reverse("accounts.home")} sudo_path = f"{reverse('accounts.sudo')}?{urlencode(ctx)}" resp = c.get(sudo_path) self.assertEqual(resp.status_code, 200) self.assertEqual(b"Please login to confirm access" in resp.content, True) # Success sudo validation payload = {"password": self.password, "next": ctx["next"]} resp = c.post(reverse("accounts.sudo"), payload) self.assertEqual(resp.status_code, 302) self.assertEqual(resp.headers["location"], ctx["next"]) # Fail sudo validation payload["password"] = self.username resp = c.post(reverse("accounts.sudo"), payload) self.assertEqual(resp.status_code, 401) self.assertEqual(b"Wrong Password" in resp.content, True) max_sudo_ttl = 5 class MockRequest: def __init__(self, path, session={}): self.path = path self.session = session class ConfirmAccessDecorator(TestCase): # TODO: override to test TTL def test_redirect_to_sudo(self): request = MockRequest(path="/") resp = ConfirmAccess.redirect_to_sudo(request) self.assertEqual(resp.status_code, 302) ctx = {"next": request.path} self.assertEqual( resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}" ) @override_settings(HOSTEA={"ACCOUNTS": {"SUDO_TTL": max_sudo_ttl}}) def test_is_valid(self): request = MockRequest(path="/") # request doesn't have sudo authorization data self.assertEqual(ConfirmAccess.is_valid(request), False) # authorize sudo ConfirmAccess.set(request) # request has sudo authorization data and is valid for this time duration self.assertEqual(ConfirmAccess.is_valid(request), True) time.sleep(settings.HOSTEA["ACCOUNTS"]["SUDO_TTL"] + 2) # request has sudo authorization data and is not valid for this time duration self.assertEqual(ConfirmAccess.is_valid(request), False) def test_validate_decorator_cls_method(self): req = MockRequest(path="/") req.session = {} def fn(req, *args, **kwargs): return True args = {} kwargs = {} # request doesn't have sudo authorization data and is not valid for this time duration resp = ConfirmAccess.validate_decorator(req, fn, *args, **kwargs) self.assertEqual(resp.status_code, 302) ctx = {"next": req.path} self.assertEqual( resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}" ) # authorize sudo ConfirmAccess.set(req) # request has sudo authorization data and is valid for this time duration self.assertEqual( ConfirmAccess.validate_decorator(req, fn, *args, **kwargs), True ) def test_validate_decorator(self): req = MockRequest(path="/") req.session = {} @confirm_access def fn(req): return True args = {} kwargs = {} # request doesn't have sudo authorization data and is not valid for this time duration resp = fn(req) self.assertEqual(resp.status_code, 302) ctx = {"next": req.path} self.assertEqual( resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}" ) # authorize sudo ConfirmAccess.set(req) # request has sudo authorization data and is valid for this time duration self.assertEqual(fn(req), True) class GetUserIDTest(TestCase): def setUp(self): self.username = "getuserid" register_util(t=self, username=self.username) def test_command_output(self): stdout = StringIO() stderr = StringIO() # username exists call_command(f"get_user_id {self.username}", stdout=stdout, stderr=stderr) self.assertIn(self.user.id, out.getvalue()) # username doesn't exist nouser = "fooabr" call_command(f"get_user_id {nouser}", stdout=out, stderr=stderr) self.assertIn(f"username {nouser} doesn't exist", stderr.getvalue())