547 lines
18 KiB
Python
547 lines
18 KiB
Python
# Create your tests here.
|
|
|
|
# Copyright © 2022 Aravinth Manivannan <realaravinth@batsense.net>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
import time
|
|
import os
|
|
from io import StringIO
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
import requests
|
|
|
|
from django.core import mail
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.management import call_command
|
|
from django.urls import reverse
|
|
from django.test import TestCase, Client, override_settings
|
|
from django.utils.http import urlencode
|
|
from django.contrib.auth import authenticate
|
|
from django.conf import settings
|
|
|
|
|
|
from oauth2_provider.models import get_application_model
|
|
|
|
from .models import AccountConfirmChallenge, PasswordResetChallenge
|
|
from .management.commands.rm_unverified_users import (
|
|
Command as CleanUnverifiedUsersCommand,
|
|
)
|
|
from .utils import ConfirmAccess
|
|
from .decorators import confirm_access
|
|
|
|
|
|
def register_util(t: TestCase, username: str):
|
|
t.password = "asdklfja;ldkfja;df"
|
|
t.username = username
|
|
t.email = f"{t.username}@example.org"
|
|
t.user = get_user_model().objects.create(
|
|
username=t.username,
|
|
email=t.email,
|
|
)
|
|
t.user.set_password(t.password)
|
|
t.user.save()
|
|
|
|
|
|
def login_util(t: TestCase, c: Client, redirect_to: str):
|
|
payload = {
|
|
"login": t.username,
|
|
"password": t.password,
|
|
}
|
|
resp = c.post(reverse("accounts.login"), payload)
|
|
t.assertEqual(resp.status_code, 302)
|
|
t.assertEqual(resp.headers["location"], reverse(redirect_to))
|
|
|
|
|
|
class LoginTest(TestCase):
|
|
"""
|
|
Tests create new app view
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.username = "create_new_app_tests"
|
|
register_util(t=self, username=self.username)
|
|
|
|
def test_login_template_works(self):
|
|
"""
|
|
Tests if login template renders
|
|
"""
|
|
resp = self.client.get(reverse("accounts.login"))
|
|
self.assertEqual(
|
|
b"A free forge ecosystem for free developers" in resp.content, True
|
|
)
|
|
|
|
def test_login_works(self):
|
|
"""
|
|
Tests if login template renders
|
|
"""
|
|
c = Client()
|
|
|
|
# username login works
|
|
login_util(t=self, c=c, redirect_to="accounts.home")
|
|
|
|
# email login works
|
|
payload = {
|
|
"login": self.email,
|
|
"password": self.password,
|
|
}
|
|
resp = c.post(reverse("accounts.login"), payload)
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("accounts.home"))
|
|
|
|
# authentication failure when wrong credentials are provided
|
|
payload = {
|
|
"login": self.email,
|
|
"password": self.user.email,
|
|
}
|
|
|
|
resp = self.client.post(reverse("accounts.login"), payload)
|
|
self.assertEqual(resp.status_code, 401)
|
|
self.assertEqual(b"Login Failed" in resp.content, True)
|
|
|
|
# protected view works
|
|
resp = c.get(reverse("accounts.home"))
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("dash.home"))
|
|
|
|
def test_logout_works(self):
|
|
"""
|
|
Logout view tests
|
|
"""
|
|
c = Client()
|
|
login_util(t=self, c=c, redirect_to="accounts.home")
|
|
resp = c.get(reverse("accounts.logout"))
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("accounts.login"))
|
|
|
|
def test_default_login_uri_works(self):
|
|
"""
|
|
/accounts/login should redirect_to /login
|
|
"""
|
|
c = Client()
|
|
resp = c.get(reverse("accounts.default_login_url"))
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("accounts.login"))
|
|
|
|
def test_login_view_redirects_if_user_is_loggedin(self):
|
|
"""
|
|
Automatically redirect authenticated users that are visiting login view
|
|
"""
|
|
c = Client()
|
|
login_util(t=self, c=c, redirect_to="accounts.home")
|
|
|
|
resp = c.get(reverse("accounts.login"))
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("accounts.home"))
|
|
|
|
resp = c.post(reverse("accounts.login"))
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("accounts.home"))
|
|
|
|
resp = c.get(
|
|
f"{reverse('accounts.login')}?next={reverse('dash.instances.list')}"
|
|
)
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("dash.instances.list"))
|
|
|
|
ctx = {"next": reverse("dash.instances.list")}
|
|
resp = c.post(reverse("accounts.login"), ctx)
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("dash.instances.list"))
|
|
|
|
|
|
class ResetPasswordTest(TestCase):
|
|
def setUp(self):
|
|
self.username = "reset_password_user"
|
|
register_util(t=self, username=self.username)
|
|
|
|
def reset_password(self):
|
|
c = Client()
|
|
payload = {
|
|
"email": self.email,
|
|
}
|
|
resp = c.get(reverse("accounts.password.reset.new"))
|
|
self.assertEqual(resp.status_code == 200)
|
|
|
|
resp = c.post(reverse("accounts.password.reset.new"), payload)
|
|
self.assertEqual(resp.status_code == 302)
|
|
challenge = PasswordResetChallenge.objects.filter(owned_by=self.user)
|
|
self.assertEqual(resp.headers["location"] == challenge.pending_url(), True)
|
|
|
|
password_reset_mail = mail.outbox.pop()
|
|
self.assertEqual("reset your password" in password_reset_mail, True)
|
|
self.assertEqual(challenge.verification_link() in password_reset_mail, True)
|
|
|
|
resp = c.get(self.challenge.verification_link())
|
|
self.assertEqual(resp.status_code == 200)
|
|
|
|
new_password = "newpasdasdf234234"
|
|
|
|
# passwords don't match
|
|
payload = {
|
|
"password": new_password,
|
|
"confirm_password": self.password,
|
|
}
|
|
resp = c.post(self.challenge.verification_link(), payload)
|
|
self.assertEqual(resp.status_code == 400)
|
|
|
|
# change password
|
|
payload["confirm_password"] = new_password
|
|
resp = c.post(self.challenge.verification_link(), payload)
|
|
self.assertEqual(resp.status_code == 302)
|
|
self.assertEqual(resp.headers["location"], reverse("accounts.login"))
|
|
|
|
# verify password changed notification email was sent
|
|
password_updated_email = mail.outbox.pop()
|
|
self.assertEqual(
|
|
"Your password for signing in to Hostea was recently changed. If you made this change, then we're all set."
|
|
in password_updated_email,
|
|
True,
|
|
)
|
|
self.assertEqual(reverse("accounts.reset.new") in password_updated_email, True)
|
|
|
|
# trying to login with old password
|
|
payload = {
|
|
"login": self.username,
|
|
"password": self.password,
|
|
}
|
|
resp = self.client.post(reverse("accounts.login"), payload)
|
|
self.assertEqual(resp.status_code, 401)
|
|
self.assertEqual(b"Login Failed" in resp.content, True)
|
|
|
|
payload["password"] = new_password
|
|
resp = c.post(reverse("accounts.login"), payload)
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("accounts.home"))
|
|
|
|
|
|
class RegistrationTest(TestCase):
|
|
def setUp(self):
|
|
self.username = "register_user"
|
|
self.password = "2i3j4;1qlk2asdf"
|
|
self.email = "register_user@example.com"
|
|
|
|
def test_register_template_works(self):
|
|
"""
|
|
Tests if register template renders
|
|
"""
|
|
resp = self.client.get(reverse("accounts.register"))
|
|
self.assertEqual(
|
|
b"A free forge ecosystem for free developers." in resp.content, True
|
|
)
|
|
|
|
def test_register_works(self):
|
|
"""
|
|
Tests if register works
|
|
"""
|
|
c = Client()
|
|
|
|
# passwords don't match
|
|
msg = {
|
|
"username": self.username,
|
|
"password": self.password,
|
|
"email": self.email,
|
|
"confirm_password": self.email,
|
|
}
|
|
resp = c.post(reverse("accounts.register"), msg)
|
|
self.assertEqual(resp.status_code, 400)
|
|
|
|
# register user
|
|
msg["confirm_password"] = self.password
|
|
resp = c.post(reverse("accounts.register"), msg)
|
|
self.assertEqual(resp.status_code, 302)
|
|
|
|
user = get_user_model().objects.get(username=self.username)
|
|
self.assertEqual(user.is_active, False)
|
|
challenge = AccountConfirmChallenge.objects.get(owned_by=user)
|
|
|
|
# verify is email is sent
|
|
self.assertEqual(len(mail.outbox), 1)
|
|
self.assertEqual(challenge.verification_link() in mail.outbox[0].body, True)
|
|
self.assertEqual(mail.outbox[0].to, [self.email])
|
|
|
|
pending_url = challenge.pending_url()
|
|
self.assertEqual(resp.headers["location"], pending_url)
|
|
resend_url = reverse("accounts.verify.resend", args=(challenge.public_ref,))
|
|
|
|
# visit pending URL
|
|
resp = c.get(pending_url)
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertEqual(str.encode(self.email) in resp.content, True)
|
|
self.assertEqual(str.encode(resend_url) in resp.content, True)
|
|
|
|
resp = c.post(resend_url)
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], pending_url)
|
|
|
|
# check resend
|
|
self.assertEqual(len(mail.outbox), 2)
|
|
|
|
resp = c.get(challenge.verification_link())
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertEqual(
|
|
str.encode(challenge.verification_link()) in resp.content, True
|
|
)
|
|
|
|
# check verification link in email
|
|
resp = c.post(challenge.verification_link())
|
|
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("accounts.login"))
|
|
user.refresh_from_db()
|
|
self.assertEqual(user.is_active, True)
|
|
|
|
old_email = msg["email"]
|
|
|
|
# duplicate username
|
|
msg["email"] = "new_register_user_email@example.com"
|
|
resp = c.post(reverse("accounts.register"), msg)
|
|
self.assertEqual(resp.status_code, 400)
|
|
self.assertEqual(b"Username is already registered" in resp.content, True)
|
|
|
|
# duplicate email
|
|
msg["username"] = "new_register_user_email"
|
|
msg["email"] = old_email
|
|
resp = c.post(reverse("accounts.register"), msg)
|
|
self.assertEqual(resp.status_code, 400)
|
|
self.assertEqual(b"Email is already registered" in resp.content, True)
|
|
|
|
|
|
class UnverifiedAccountCleanupTets(TestCase):
|
|
"""
|
|
Tests account clean up management command
|
|
"""
|
|
|
|
@override_settings(HOSTEA={"ACCOUNTS": {"MAX_VERIFICATION_TOLERANCE_PERIOD": 0}})
|
|
def test_register_works(self):
|
|
"""
|
|
Tests if register works
|
|
"""
|
|
c = Client()
|
|
|
|
username1 = "cleanup_user1"
|
|
username2 = "cleanup_user2"
|
|
|
|
# passwords don't match
|
|
msg = {
|
|
"username": username1,
|
|
"password": "asdklfja;ldkfja;df",
|
|
"email": f"{username1}@example.com",
|
|
"confirm_password": "asdklfja;ldkfja;df",
|
|
}
|
|
|
|
# register user
|
|
resp = c.post(reverse("accounts.register"), msg)
|
|
self.assertEqual(resp.status_code, 302)
|
|
|
|
msg["username"] = username2
|
|
msg["email"] = f"{username2}@example.org"
|
|
resp = c.post(reverse("accounts.register"), msg)
|
|
self.assertEqual(resp.status_code, 302)
|
|
|
|
user1 = get_user_model().objects.get(username=username1)
|
|
user1.is_active = True
|
|
user1.save()
|
|
|
|
if settings.HOSTEA["ACCOUNTS"]["MAX_VERIFICATION_TOLERANCE_PERIOD"] > 10:
|
|
raise Exception(
|
|
"This test requires MAX_VERIFICATION_TOLERANCE_PERIOD to be less, ideally less 10"
|
|
)
|
|
|
|
time.sleep(settings.HOSTEA["ACCOUNTS"]["MAX_VERIFICATION_TOLERANCE_PERIOD"] + 2)
|
|
cmd = CleanUnverifiedUsersCommand()
|
|
cmd.handle()
|
|
|
|
self.assertEqual(
|
|
get_user_model().objects.filter(username=username1).exists(), True
|
|
)
|
|
self.assertEqual(
|
|
get_user_model().objects.filter(username=username2).exists(), False
|
|
)
|
|
|
|
|
|
class SudoWorks(TestCase):
|
|
def setUp(self):
|
|
self.username = "sudo_useworks"
|
|
register_util(t=self, username=self.username)
|
|
|
|
def test_sudo_renders(self):
|
|
c = Client()
|
|
|
|
resp = c.get(reverse("accounts.sudo"))
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(
|
|
resp.headers["location"],
|
|
f"/accounts/login/?next={reverse('accounts.sudo')}",
|
|
)
|
|
|
|
login_util(t=self, c=c, redirect_to="accounts.home")
|
|
|
|
# GET sudo page
|
|
ctx = {"next": reverse("accounts.home")}
|
|
sudo_path = f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
|
|
resp = c.get(sudo_path)
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertEqual(b"Please login to confirm access" in resp.content, True)
|
|
|
|
# Success sudo validation
|
|
payload = {"password": self.password, "next": ctx["next"]}
|
|
resp = c.post(reverse("accounts.sudo"), payload)
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], ctx["next"])
|
|
|
|
# Fail sudo validation
|
|
payload["password"] = self.username
|
|
resp = c.post(reverse("accounts.sudo"), payload)
|
|
self.assertEqual(resp.status_code, 401)
|
|
self.assertEqual(b"Wrong Password" in resp.content, True)
|
|
|
|
|
|
max_sudo_ttl = 5
|
|
|
|
|
|
class MockRequest:
|
|
def __init__(self, path, session={}):
|
|
self.path = path
|
|
self.session = session
|
|
|
|
|
|
class ConfirmAccessDecorator(TestCase):
|
|
|
|
# TODO: override to test TTL
|
|
def test_redirect_to_sudo(self):
|
|
request = MockRequest(path="/")
|
|
resp = ConfirmAccess.redirect_to_sudo(request)
|
|
self.assertEqual(resp.status_code, 302)
|
|
ctx = {"next": request.path}
|
|
self.assertEqual(
|
|
resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
|
|
)
|
|
|
|
@override_settings(HOSTEA={"ACCOUNTS": {"SUDO_TTL": max_sudo_ttl}})
|
|
def test_is_valid(self):
|
|
request = MockRequest(path="/")
|
|
|
|
# request doesn't have sudo authorization data
|
|
self.assertEqual(ConfirmAccess.is_valid(request), False)
|
|
|
|
# authorize sudo
|
|
ConfirmAccess.set(request)
|
|
|
|
# request has sudo authorization data and is valid for this time duration
|
|
self.assertEqual(ConfirmAccess.is_valid(request), True)
|
|
|
|
time.sleep(settings.HOSTEA["ACCOUNTS"]["SUDO_TTL"] + 2)
|
|
|
|
# request has sudo authorization data and is not valid for this time duration
|
|
self.assertEqual(ConfirmAccess.is_valid(request), False)
|
|
|
|
def test_validate_decorator_cls_method(self):
|
|
req = MockRequest(path="/")
|
|
req.session = {}
|
|
|
|
def fn(req, *args, **kwargs):
|
|
return True
|
|
|
|
args = {}
|
|
kwargs = {}
|
|
|
|
# request doesn't have sudo authorization data and is not valid for this time duration
|
|
resp = ConfirmAccess.validate_decorator(req, fn, *args, **kwargs)
|
|
self.assertEqual(resp.status_code, 302)
|
|
|
|
ctx = {"next": req.path}
|
|
self.assertEqual(
|
|
resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
|
|
)
|
|
|
|
# authorize sudo
|
|
ConfirmAccess.set(req)
|
|
# request has sudo authorization data and is valid for this time duration
|
|
self.assertEqual(
|
|
ConfirmAccess.validate_decorator(req, fn, *args, **kwargs), True
|
|
)
|
|
|
|
def test_validate_decorator(self):
|
|
req = MockRequest(path="/")
|
|
req.session = {}
|
|
|
|
@confirm_access
|
|
def fn(req):
|
|
return True
|
|
|
|
args = {}
|
|
kwargs = {}
|
|
|
|
# request doesn't have sudo authorization data and is not valid for this time duration
|
|
resp = fn(req)
|
|
self.assertEqual(resp.status_code, 302)
|
|
|
|
ctx = {"next": req.path}
|
|
self.assertEqual(
|
|
resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
|
|
)
|
|
|
|
# authorize sudo
|
|
ConfirmAccess.set(req)
|
|
# request has sudo authorization data and is valid for this time duration
|
|
self.assertEqual(fn(req), True)
|
|
|
|
|
|
class CreateOidCApplicaiton(TestCase):
|
|
"""
|
|
Test command: manage.py create_oidc
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.username = "oidcadmin"
|
|
register_util(t=self, username=self.username)
|
|
|
|
def test_cmd(self):
|
|
|
|
Application = get_application_model()
|
|
|
|
stdout = StringIO()
|
|
stderr = StringIO()
|
|
|
|
redirect_uri = "http://example.org"
|
|
app_name = "test_cmd_oidc"
|
|
|
|
# username exists
|
|
call_command(
|
|
"create_oidc",
|
|
app_name,
|
|
self.username,
|
|
redirect_uri,
|
|
stdout=stdout,
|
|
stderr=stderr,
|
|
)
|
|
out = stdout.getvalue()
|
|
|
|
self.assertIn(f"New application {app_name} created successfully", out)
|
|
|
|
client_id = out.split("\n")[1].split(" ")[1]
|
|
|
|
self.assertEqual(
|
|
get_application_model().objects.filter(client_id=client_id).exists(), True
|
|
)
|
|
app = get_application_model().objects.get(name=app_name)
|
|
self.assertEqual(app.client_id, client_id)
|
|
self.assertEqual(app.name, app_name)
|
|
self.assertEqual(app.user_id, self.user.id)
|
|
self.assertEqual(app.redirect_uris, redirect_uri)
|
|
self.assertEqual(app.skip_authorization, True)
|
|
self.assertEqual(app.client_type, "confidential")
|
|
self.assertEqual(app.authorization_grant_type, "authorization-code")
|
|
self.assertEqual(app.algorithm, "HS256")
|