dashboard/accounts/tests.py

478 lines
15 KiB
Python

# Create your tests here.
# Copyright © 2022 Aravinth Manivannan <realaravinth@batsense.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import os
from io import StringIO
from urllib.parse import urlparse, urlunparse
import requests
from django.core import mail
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.urls import reverse
from django.test import TestCase, Client, override_settings
from django.utils.http import urlencode
from django.contrib.auth import authenticate
from django.conf import settings
from oauth2_provider.models import get_application_model
from .models import AccountConfirmChallenge
from .management.commands.rm_unverified_users import (
Command as CleanUnverifiedUsersCommand,
)
from .utils import ConfirmAccess
from .decorators import confirm_access
def register_util(t: TestCase, username: str):
t.password = "asdklfja;ldkfja;df"
t.username = username
t.email = f"{t.username}@example.org"
t.user = get_user_model().objects.create(
username=t.username,
email=t.email,
)
t.user.set_password(t.password)
t.user.save()
def login_util(t: TestCase, c: Client, redirect_to: str):
payload = {
"login": t.username,
"password": t.password,
}
resp = c.post(reverse("accounts.login"), payload)
t.assertEqual(resp.status_code, 302)
t.assertEqual(resp.headers["location"], reverse(redirect_to))
class LoginTest(TestCase):
"""
Tests create new app view
"""
def setUp(self):
self.username = "create_new_app_tests"
register_util(t=self, username=self.username)
def test_login_template_works(self):
"""
Tests if login template renders
"""
resp = self.client.get(reverse("accounts.login"))
self.assertEqual(b"Free Forge Ecosystem" in resp.content, True)
def test_login_works(self):
"""
Tests if login template renders
"""
c = Client()
# username login works
login_util(t=self, c=c, redirect_to="accounts.home")
# email login works
payload = {
"login": self.email,
"password": self.password,
}
resp = c.post(reverse("accounts.login"), payload)
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("accounts.home"))
# authentication failure when wrong credentials are provided
payload = {
"login": self.email,
"password": self.user.email,
}
resp = self.client.post(reverse("accounts.login"), payload)
self.assertEqual(resp.status_code, 401)
self.assertEqual(b"Login Failed" in resp.content, True)
# protected view works
resp = c.get(reverse("accounts.home"))
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("dash.home"))
def test_logout_works(self):
"""
Logout view tests
"""
c = Client()
login_util(t=self, c=c, redirect_to="accounts.home")
resp = c.get(reverse("accounts.logout"))
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("accounts.login"))
def test_default_login_uri_works(self):
"""
/accounts/login should redirect_to /login
"""
c = Client()
resp = c.get(reverse("accounts.default_login_url"))
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("accounts.login"))
def test_login_view_redirects_if_user_is_loggedin(self):
"""
Automatically redirect authenticated users that are visiting login view
"""
c = Client()
login_util(t=self, c=c, redirect_to="accounts.home")
resp = c.get(reverse("accounts.login"))
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("accounts.home"))
resp = c.post(reverse("accounts.login"))
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("accounts.home"))
resp = c.get(
f"{reverse('accounts.login')}?next={reverse('dash.instances.list')}"
)
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("dash.instances.list"))
ctx = {"next": reverse("dash.instances.list")}
resp = c.post(reverse("accounts.login"), ctx)
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("dash.instances.list"))
class RegistrationTest(TestCase):
def setUp(self):
self.username = "register_user"
self.password = "2i3j4;1qlk2asdf"
self.email = "register_user@example.com"
def test_register_template_works(self):
"""
Tests if register template renders
"""
resp = self.client.get(reverse("accounts.register"))
self.assertEqual(b"Free Forge Ecosystem" in resp.content, True)
def test_register_works(self):
"""
Tests if register works
"""
c = Client()
# passwords don't match
msg = {
"username": self.username,
"password": self.password,
"email": self.email,
"confirm_password": self.email,
}
resp = c.post(reverse("accounts.register"), msg)
self.assertEqual(resp.status_code, 400)
# register user
msg["confirm_password"] = self.password
resp = c.post(reverse("accounts.register"), msg)
self.assertEqual(resp.status_code, 302)
user = get_user_model().objects.get(username=self.username)
self.assertEqual(user.is_active, False)
challenge = AccountConfirmChallenge.objects.get(owned_by=user)
# verify is email is sent
self.assertEqual(len(mail.outbox), 1)
self.assertEqual(challenge.verification_link() in mail.outbox[0].body, True)
self.assertEqual(mail.outbox[0].to, [self.email])
pending_url = challenge.pending_url()
self.assertEqual(resp.headers["location"], pending_url)
resend_url = reverse("accounts.verify.resend", args=(challenge.public_ref,))
# visit pending URL
resp = c.get(pending_url)
self.assertEqual(resp.status_code, 200)
self.assertEqual(str.encode(self.email) in resp.content, True)
self.assertEqual(str.encode(resend_url) in resp.content, True)
resp = c.post(resend_url)
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], pending_url)
# check resend
self.assertEqual(len(mail.outbox), 2)
resp = c.get(challenge.verification_link())
self.assertEqual(resp.status_code, 200)
self.assertEqual(
str.encode(challenge.verification_link()) in resp.content, True
)
# check verification link in email
resp = c.post(challenge.verification_link())
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], reverse("accounts.login"))
user.refresh_from_db()
self.assertEqual(user.is_active, True)
old_email = msg["email"]
# duplicate username
msg["email"] = "new_register_user_email@example.com"
resp = c.post(reverse("accounts.register"), msg)
self.assertEqual(resp.status_code, 400)
self.assertEqual(b"Username is already registered" in resp.content, True)
# duplicate email
msg["username"] = "new_register_user_email"
msg["email"] = old_email
resp = c.post(reverse("accounts.register"), msg)
self.assertEqual(resp.status_code, 400)
self.assertEqual(b"Email is already registered" in resp.content, True)
class UnverifiedAccountCleanupTets(TestCase):
"""
Tests account clean up management command
"""
@override_settings(HOSTEA={"ACCOUNTS": {"MAX_VERIFICATION_TOLERANCE_PERIOD": 0}})
def test_register_works(self):
"""
Tests if register works
"""
c = Client()
username1 = "cleanup_user1"
username2 = "cleanup_user2"
# passwords don't match
msg = {
"username": username1,
"password": "asdklfja;ldkfja;df",
"email": f"{username1}@example.com",
"confirm_password": "asdklfja;ldkfja;df",
}
# register user
resp = c.post(reverse("accounts.register"), msg)
self.assertEqual(resp.status_code, 302)
msg["username"] = username2
msg["email"] = f"{username2}@example.org"
resp = c.post(reverse("accounts.register"), msg)
self.assertEqual(resp.status_code, 302)
user1 = get_user_model().objects.get(username=username1)
user1.is_active = True
user1.save()
if settings.HOSTEA["ACCOUNTS"]["MAX_VERIFICATION_TOLERANCE_PERIOD"] > 10:
raise Exception(
"This test requires MAX_VERIFICATION_TOLERANCE_PERIOD to be less, ideally less 10"
)
time.sleep(settings.HOSTEA["ACCOUNTS"]["MAX_VERIFICATION_TOLERANCE_PERIOD"] + 2)
cmd = CleanUnverifiedUsersCommand()
cmd.handle()
self.assertEqual(
get_user_model().objects.filter(username=username1).exists(), True
)
self.assertEqual(
get_user_model().objects.filter(username=username2).exists(), False
)
class SudoWorks(TestCase):
def setUp(self):
self.username = "sudo_useworks"
register_util(t=self, username=self.username)
def test_sudo_renders(self):
c = Client()
resp = c.get(reverse("accounts.sudo"))
self.assertEqual(resp.status_code, 302)
self.assertEqual(
resp.headers["location"],
f"/accounts/login/?next={reverse('accounts.sudo')}",
)
login_util(t=self, c=c, redirect_to="accounts.home")
# GET sudo page
ctx = {"next": reverse("accounts.home")}
sudo_path = f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
resp = c.get(sudo_path)
self.assertEqual(resp.status_code, 200)
self.assertEqual(b"Please login to confirm access" in resp.content, True)
# Success sudo validation
payload = {"password": self.password, "next": ctx["next"]}
resp = c.post(reverse("accounts.sudo"), payload)
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.headers["location"], ctx["next"])
# Fail sudo validation
payload["password"] = self.username
resp = c.post(reverse("accounts.sudo"), payload)
self.assertEqual(resp.status_code, 401)
self.assertEqual(b"Wrong Password" in resp.content, True)
max_sudo_ttl = 5
class MockRequest:
def __init__(self, path, session={}):
self.path = path
self.session = session
class ConfirmAccessDecorator(TestCase):
# TODO: override to test TTL
def test_redirect_to_sudo(self):
request = MockRequest(path="/")
resp = ConfirmAccess.redirect_to_sudo(request)
self.assertEqual(resp.status_code, 302)
ctx = {"next": request.path}
self.assertEqual(
resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
)
@override_settings(HOSTEA={"ACCOUNTS": {"SUDO_TTL": max_sudo_ttl}})
def test_is_valid(self):
request = MockRequest(path="/")
# request doesn't have sudo authorization data
self.assertEqual(ConfirmAccess.is_valid(request), False)
# authorize sudo
ConfirmAccess.set(request)
# request has sudo authorization data and is valid for this time duration
self.assertEqual(ConfirmAccess.is_valid(request), True)
time.sleep(settings.HOSTEA["ACCOUNTS"]["SUDO_TTL"] + 2)
# request has sudo authorization data and is not valid for this time duration
self.assertEqual(ConfirmAccess.is_valid(request), False)
def test_validate_decorator_cls_method(self):
req = MockRequest(path="/")
req.session = {}
def fn(req, *args, **kwargs):
return True
args = {}
kwargs = {}
# request doesn't have sudo authorization data and is not valid for this time duration
resp = ConfirmAccess.validate_decorator(req, fn, *args, **kwargs)
self.assertEqual(resp.status_code, 302)
ctx = {"next": req.path}
self.assertEqual(
resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
)
# authorize sudo
ConfirmAccess.set(req)
# request has sudo authorization data and is valid for this time duration
self.assertEqual(
ConfirmAccess.validate_decorator(req, fn, *args, **kwargs), True
)
def test_validate_decorator(self):
req = MockRequest(path="/")
req.session = {}
@confirm_access
def fn(req):
return True
args = {}
kwargs = {}
# request doesn't have sudo authorization data and is not valid for this time duration
resp = fn(req)
self.assertEqual(resp.status_code, 302)
ctx = {"next": req.path}
self.assertEqual(
resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
)
# authorize sudo
ConfirmAccess.set(req)
# request has sudo authorization data and is valid for this time duration
self.assertEqual(fn(req), True)
class CreateOidCApplicaiton(TestCase):
"""
Test command: manage.py create_oidc
"""
def setUp(self):
self.username = "oidcadmin"
register_util(t=self, username=self.username)
def test_cmd(self):
Application = get_application_model()
stdout = StringIO()
stderr = StringIO()
redirect_uri = "http://example.org"
app_name = "test_cmd_oidc"
# username exists
call_command(
"create_oidc",
app_name,
self.username,
redirect_uri,
stdout=stdout,
stderr=stderr,
)
out = stdout.getvalue()
self.assertIn(f"New application {app_name} created successfully", out)
client_id = out.split("\n")[1].split(" ")[1]
self.assertEqual(
get_application_model().objects.filter(client_id=client_id).exists(), True
)
app = get_application_model().objects.get(name=app_name)
self.assertEqual(app.client_id, client_id)
self.assertEqual(app.name, app_name)
self.assertEqual(app.user_id, self.user.id)
self.assertEqual(app.redirect_uris, redirect_uri)
self.assertEqual(app.skip_authorization, True)
self.assertEqual(app.client_type, "confidential")
self.assertEqual(app.authorization_grant_type, "authorization-code")
self.assertEqual(app.algorithm, "HS256")