349 lines
11 KiB
Python
349 lines
11 KiB
Python
# Copyright © 2022 Aravinth Manivannan <realaravinth@batsense.net>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
from django.shortcuts import render, redirect, get_object_or_404
|
|
from django.contrib.auth.password_validation import validate_password
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.http import urlencode
|
|
from django.contrib.auth import authenticate, login, logout
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.http import HttpResponse
|
|
from django.views.decorators.csrf import csrf_protect
|
|
from django.urls import reverse
|
|
|
|
from dash.utils import footer_ctx
|
|
|
|
from .models import AccountConfirmChallenge, PasswordResetChallenge
|
|
from .utils import send_verification_email, ConfirmAccess, send_password_reset_email
|
|
from .decorators import redirect_if_authenticated
|
|
|
|
|
|
@redirect_if_authenticated
|
|
@csrf_protect
|
|
def login_view(request):
|
|
def default_login_ctx():
|
|
return {
|
|
"title": "Login",
|
|
"footer": footer_ctx(),
|
|
}
|
|
|
|
if request.method == "GET":
|
|
ctx = default_login_ctx()
|
|
if "next" in request.GET:
|
|
ctx["next"] = request.GET["next"]
|
|
return render(request, "accounts/auth/login.html", ctx)
|
|
|
|
login_cred = request.POST["login"]
|
|
user = None
|
|
|
|
if "@" in login_cred:
|
|
user = get_user_model().objects.get(email=login_cred)
|
|
if user is not None:
|
|
user = authenticate(
|
|
username=user.username,
|
|
password=request.POST["password"],
|
|
)
|
|
else:
|
|
user = authenticate(
|
|
username=login_cred,
|
|
password=request.POST["password"],
|
|
)
|
|
|
|
if user is not None:
|
|
login(request, user)
|
|
if "next" in request.POST:
|
|
next_url = request.POST["next"]
|
|
if all([next_url, len(next_url) > 0]):
|
|
return redirect(next_url)
|
|
return redirect(reverse("accounts.home"))
|
|
|
|
ctx = default_login_ctx()
|
|
ctx["error"] = {
|
|
"title": "Login Failed",
|
|
"reason": "Username or passwrod is incorrect, please try again.",
|
|
}
|
|
return render(request, "accounts/auth/login.html", status=401, context=ctx)
|
|
|
|
|
|
@login_required
|
|
def protected_view(request):
|
|
return redirect(reverse("dash.home"))
|
|
|
|
|
|
@redirect_if_authenticated
|
|
def default_login_url(request):
|
|
if "next" in request.GET:
|
|
ctx = {"next": request.GET["next"]}
|
|
return redirect(f"{reverse('accounts.login')}?{urlencode(ctx)}")
|
|
return redirect(reverse("accounts.login"))
|
|
|
|
|
|
@login_required
|
|
def logout_view(request):
|
|
logout(request)
|
|
return redirect(reverse("accounts.login"))
|
|
|
|
|
|
@redirect_if_authenticated
|
|
@csrf_protect
|
|
def register_view(request):
|
|
def default_register_ctx(username=None, email=None):
|
|
return {
|
|
"title": "Register",
|
|
"username": username,
|
|
"email": username,
|
|
"footer": footer_ctx(),
|
|
}
|
|
|
|
if request.method == "GET":
|
|
ctx = default_register_ctx()
|
|
if "next" in request.GET:
|
|
ctx["next"] = request.GET["next"]
|
|
return render(request, "accounts/auth/register.html", ctx)
|
|
|
|
confirm_password = request.POST["confirm_password"]
|
|
password = request.POST["password"]
|
|
username = request.POST["username"]
|
|
email = request.POST["email"]
|
|
|
|
if password != confirm_password:
|
|
ctx = default_register_ctx(username=username, email=email)
|
|
ctx["error"] = {
|
|
"title": "Registration Failed",
|
|
"reason": "Passwords don't match, please verify input",
|
|
}
|
|
return render(request, "accounts/auth/register.html", status=400, context=ctx)
|
|
|
|
User = get_user_model()
|
|
if User.objects.filter(email=email).exists():
|
|
ctx = default_register_ctx(username=username, email=email)
|
|
ctx["error"] = {
|
|
"title": "Registration Failed",
|
|
"reason": "Email is already registered",
|
|
}
|
|
return render(request, "accounts/auth/register.html", status=400, context=ctx)
|
|
|
|
if User.objects.filter(username=username).exists():
|
|
ctx = default_register_ctx(username=username, email=email)
|
|
ctx["error"] = {
|
|
"title": "Registration Failed",
|
|
"reason": "Username is already registered",
|
|
}
|
|
return render(request, "accounts/auth/register.html", status=400, context=ctx)
|
|
|
|
user = get_user_model()(
|
|
username=username,
|
|
email=email,
|
|
is_active=False,
|
|
) # TODO: get email from settings.py
|
|
user.set_password(password)
|
|
try:
|
|
user.full_clean()
|
|
validate_password(password, user=user)
|
|
except ValidationError as err:
|
|
ctx = default_register_ctx(username=username, email=email)
|
|
reason = ""
|
|
for r in err:
|
|
reason += r + " "
|
|
|
|
ctx["error"] = {"title": "Registration Failed", "reason": reason}
|
|
return render(request, "accounts/auth/register.html", status=400, context=ctx)
|
|
|
|
user.is_active = False
|
|
user.save()
|
|
|
|
challenge = None
|
|
|
|
if not AccountConfirmChallenge.objects.filter(owned_by=user).exists():
|
|
challenge = AccountConfirmChallenge(owned_by=user)
|
|
challenge.save()
|
|
send_verification_email(request, challenge=challenge)
|
|
else:
|
|
challenge = AccountConfirmChallenge.objects.get(owned_by=user)
|
|
|
|
return redirect(challenge.pending_url())
|
|
|
|
|
|
@redirect_if_authenticated
|
|
def verification_pending_view(request, public_ref):
|
|
challenge = get_object_or_404(AccountConfirmChallenge, public_ref=public_ref)
|
|
|
|
ctx = {
|
|
"email": challenge.owned_by.email,
|
|
"public_ref": challenge.public_ref,
|
|
}
|
|
|
|
return render(request, "accounts/auth/verification-pending.html", context=ctx)
|
|
|
|
|
|
@redirect_if_authenticated
|
|
def resend_verification_email_view(request, public_ref):
|
|
challenge = get_object_or_404(AccountConfirmChallenge, public_ref=public_ref)
|
|
send_verification_email(request, challenge=challenge)
|
|
return redirect(challenge.pending_url())
|
|
|
|
|
|
def verify_account(request, challenge):
|
|
challenge = get_object_or_404(AccountConfirmChallenge, challenge_text=challenge)
|
|
|
|
if request.method == "GET":
|
|
ctx = {
|
|
"challenge": challenge,
|
|
}
|
|
return render(request, "accounts/auth/verify.html", context=ctx)
|
|
|
|
challenge.owned_by.is_active = True
|
|
challenge.owned_by.save()
|
|
challenge.delete()
|
|
return redirect("accounts.login")
|
|
|
|
|
|
@login_required
|
|
@csrf_protect
|
|
def sudo(request):
|
|
def default_login_ctx():
|
|
return {
|
|
"title": "Confirm Access",
|
|
"footer": footer_ctx(),
|
|
}
|
|
|
|
if request.method == "GET":
|
|
ctx = default_login_ctx()
|
|
ctx["next"] = request.GET["next"]
|
|
return render(request, "accounts/auth/sudo.html", ctx)
|
|
|
|
password = request.POST["password"]
|
|
user = request.user
|
|
|
|
user = authenticate(
|
|
username=user.username,
|
|
password=request.POST["password"],
|
|
)
|
|
if user is None:
|
|
ctx = default_login_ctx()
|
|
ctx["next"] = request.POST["next"]
|
|
ctx["error"] = {
|
|
"title": "Wrong Password",
|
|
"reason": "Password is incorrect, please try again.",
|
|
}
|
|
return render(request, "accounts/auth/sudo.html", status=401, context=ctx)
|
|
|
|
ConfirmAccess.set(request=request)
|
|
return redirect(request.POST["next"])
|
|
|
|
|
|
@redirect_if_authenticated
|
|
@csrf_protect
|
|
def password_reset_send_verificaiton_link(request):
|
|
def default_ctx():
|
|
return {
|
|
"title": "Reset Password",
|
|
"footer": footer_ctx(),
|
|
}
|
|
|
|
if request.method == "GET":
|
|
ctx = default_ctx()
|
|
return render(request, "accounts/auth/password-reset-form.html", ctx)
|
|
|
|
challenge = None
|
|
|
|
email = request.POST["email"]
|
|
User = get_user_model()
|
|
user = get_object_or_404(User, email=email)
|
|
if not PasswordResetChallenge.objects.filter(owned_by=user).exists():
|
|
challenge = PasswordResetChallenge(owned_by=user)
|
|
challenge.save()
|
|
send_password_reset_email(request, challenge=challenge)
|
|
else:
|
|
challenge = PasswordResetChallenge.objects.get(owned_by=user)
|
|
return redirect(challenge.pending_url())
|
|
|
|
|
|
@redirect_if_authenticated
|
|
@csrf_protect
|
|
def password_resend_verification_link_pending(request, public_ref):
|
|
challenge = get_object_or_404(PasswordResetChallenge, public_ref=public_ref)
|
|
|
|
if request.method == "GET":
|
|
ctx = {
|
|
"email": challenge.owned_by.email,
|
|
"public_ref": challenge.public_ref,
|
|
}
|
|
|
|
return render(
|
|
request,
|
|
"accounts/auth/password-reset-resend-verification.html",
|
|
context=ctx,
|
|
)
|
|
|
|
send_password_reset_email(request, challenge=challenge)
|
|
ctx = {
|
|
"email": challenge.owned_by.email,
|
|
"public_ref": challenge.public_ref,
|
|
}
|
|
|
|
return render(
|
|
request, "accounts/auth/password-reset-resend-verification.html", context=ctx
|
|
)
|
|
|
|
|
|
@csrf_protect
|
|
def reset_password(request, challenge):
|
|
def default_ctx(challenge):
|
|
return {
|
|
"title": "Reset Password",
|
|
"footer": footer_ctx(),
|
|
"challenge": challenge,
|
|
}
|
|
|
|
challenge = get_object_or_404(PasswordResetChallenge, challenge_text=challenge)
|
|
|
|
if request.method == "GET":
|
|
ctx = default_ctx(challenge=challenge)
|
|
return render(request, "accounts/auth/password-reset.html", ctx)
|
|
|
|
confirm_password = request.POST["confirm_password"]
|
|
password = request.POST["password"]
|
|
|
|
if password != confirm_password:
|
|
ctx = default_ctx(challenge=challenge)
|
|
ctx["error"] = {
|
|
"title": "Reset Password Failed",
|
|
"reason": "Passwords don't match, please verify input",
|
|
}
|
|
return render(
|
|
request, "accounts/auth/password-reset.html", status=400, context=ctx
|
|
)
|
|
|
|
user = challenge.owned_by
|
|
try:
|
|
validate_password(password, user=user)
|
|
except ValidationError as err:
|
|
ctx = default_ctx(challenge=challenge)
|
|
reason = ""
|
|
for r in err:
|
|
reason += r + " "
|
|
|
|
ctx["error"] = {"title": "Reset Password Failed", "reason": reason}
|
|
return render(
|
|
request, "accounts/auth/password-reset.html", status=400, context=ctx
|
|
)
|
|
|
|
user.set_password(password)
|
|
user.save()
|
|
challenge.delete()
|
|
send_password_reset_email(request)
|
|
return redirect("accounts.login")
|