326 lines
11 KiB
Python
326 lines
11 KiB
Python
# Copyright © 2022 Aravinth Manivannan <realaravinth@batsense.net>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
import subprocess
|
|
import shutil
|
|
import os
|
|
from time import sleep
|
|
from pathlib import Path
|
|
|
|
|
|
from django.contrib.auth import get_user_model
|
|
from django.utils.http import urlencode
|
|
from django.urls import reverse
|
|
from django.test import TestCase, Client, override_settings
|
|
from django.conf import settings
|
|
from django.db.utils import IntegrityError
|
|
from payments import get_payment_model, RedirectNeeded, PaymentStatus
|
|
|
|
from accounts.tests import login_util, register_util
|
|
|
|
from .models import InstanceConfiguration, Instance
|
|
from .utils import create_instance, sanitize_vm_name, VmErrors, VmException
|
|
|
|
|
|
def create_configurations(t: TestCase):
|
|
t.instance_config = [
|
|
InstanceConfiguration.objects.get(
|
|
name="s1-2", rent=10, ram=2, cpu=1, storage=10
|
|
),
|
|
InstanceConfiguration.objects.get(
|
|
name="s1-4", rent=20, ram=4, cpu=1, storage=20
|
|
),
|
|
InstanceConfiguration.objects.get(
|
|
name="s1-8", rent=40, ram=8, cpu=2, storage=40
|
|
),
|
|
]
|
|
|
|
for instance in t.instance_config:
|
|
print(f"[*][init] Instance {instance.name} is saved")
|
|
t.assertEqual(
|
|
InstanceConfiguration.objects.filter(name=instance.name).exists(), True
|
|
)
|
|
|
|
|
|
def infra_custom_config(test_name: str):
|
|
def create_fleet_repo(test_name: str):
|
|
subprocess.run(
|
|
["./integration/ci.sh", "new_fleet_repo", test_name],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
sleep(10)
|
|
|
|
create_fleet_repo(test_name=test_name)
|
|
|
|
c = settings.HOSTEA
|
|
path = Path(f"/tmp/hostea/dashboard/{test_name}/repo")
|
|
if path.exists():
|
|
shutil.rmtree(path)
|
|
c["INFRA"]["HOSTEA_REPO"]["PATH"] = str(path)
|
|
remote_base = os.environ.get("HOSTEA_INFRA_HOSTEA_REPO_REMOTE")
|
|
c["INFRA"]["HOSTEA_REPO"]["REMOTE"] = f"{remote_base}{test_name}.git"
|
|
print(c["INFRA"]["HOSTEA_REPO"]["REMOTE"])
|
|
return c
|
|
|
|
|
|
def create_instance_util(
|
|
t: TestCase, c: Client, instance_name: str, config: InstanceConfiguration
|
|
):
|
|
payload = {"name": instance_name, "configuration": config.name}
|
|
|
|
resp = c.post(reverse("dash.instances.new"), payload)
|
|
t.assertEqual(resp.status_code, 302)
|
|
t.assertEqual(
|
|
resp.headers["location"],
|
|
reverse("billing.invoice.generate", args=(instance_name,)),
|
|
)
|
|
# generate invoice
|
|
payment_uri = reverse("billing.invoice.generate", args=(instance_name,))
|
|
resp = c.get(payment_uri)
|
|
t.assertEqual(resp.status_code, 302)
|
|
invoice_uri = resp.headers["Location"]
|
|
t.assertEqual("invoice/payment/" in invoice_uri, True)
|
|
|
|
# simulate payment. There's probably a better way to do this
|
|
payment = get_payment_model().objects.get(
|
|
paid_by=t.user, instance_name=instance_name
|
|
)
|
|
payment.status = PaymentStatus.CONFIRMED
|
|
payment.save()
|
|
|
|
resp = c.get(reverse("infra.create", args=(instance_name,)))
|
|
t.assertEqual(resp.status_code, 200)
|
|
|
|
|
|
class DashHome(TestCase):
|
|
"""
|
|
Tests create new app view
|
|
"""
|
|
|
|
def setUp(self):
|
|
register_util(t=self, username="dashboard_home_user")
|
|
|
|
def test_dash_is_protected(self):
|
|
"""
|
|
Tests if dashboard template renders
|
|
"""
|
|
resp = self.client.get(reverse("dash.home"))
|
|
self.assertEqual(resp.status_code, 302)
|
|
|
|
# default LOGIN redirect URI that is used by @login_required decorator is
|
|
# /accounts/login. There's a redirection endpoint at /accounts/login/ that
|
|
# will redirect user to /login. Hence the /accounts prefix
|
|
redirect_login_uri = (
|
|
f"/accounts{reverse('accounts.login')}?next={reverse('dash.home')}"
|
|
)
|
|
self.assertEqual(resp.headers["location"], redirect_login_uri)
|
|
|
|
def test_dash_home_renders(self):
|
|
"""
|
|
Tests if login template renders
|
|
"""
|
|
c = Client()
|
|
|
|
# username login works
|
|
login_util(t=self, c=c, redirect_to="accounts.home")
|
|
|
|
# email login works
|
|
resp = c.get(reverse("dash.home"))
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertEqual(b"Billing" in resp.content, True)
|
|
self.assertEqual(b"Support" in resp.content, True)
|
|
self.assertEqual(b"Logout" in resp.content, True)
|
|
|
|
|
|
class InstancesConfig(TestCase):
|
|
"""
|
|
Tests InstancesConfig model
|
|
"""
|
|
|
|
def test_unique_constraint(self):
|
|
"""
|
|
Test configuration uniqueness
|
|
"""
|
|
config1 = InstanceConfiguration(
|
|
name="test config 1", rent=5.0, ram=0.5, cpu=1, storage=0.5
|
|
)
|
|
config1.save()
|
|
config2 = InstanceConfiguration(
|
|
name="test config 2", rent=5.0, ram=0.5, cpu=2, storage=0.5
|
|
)
|
|
config2.save()
|
|
with self.assertRaises(IntegrityError):
|
|
config3 = InstanceConfiguration(
|
|
name="test config 3", rent=5.0, ram=0.5, cpu=1, storage=0.5
|
|
)
|
|
config3.save()
|
|
|
|
def test_default_configuration_is_loaded(self):
|
|
"""
|
|
Expects InstancesConfig titled "s1-2", "s1-4" and "s1-8"
|
|
|
|
ref: https://forgejo.gna.org/Hostea/july-mvp/issues/10#issuecomment-639
|
|
"""
|
|
self.assertEqual(
|
|
InstanceConfiguration.objects.filter(
|
|
name="s1-2", rent=10, ram=2, cpu=1, storage=10
|
|
).exists(),
|
|
True,
|
|
)
|
|
self.assertEqual(
|
|
InstanceConfiguration.objects.filter(
|
|
name="s1-4", rent=20, ram=4, cpu=1, storage=20
|
|
).exists(),
|
|
True,
|
|
)
|
|
self.assertEqual(
|
|
InstanceConfiguration.objects.filter(
|
|
name="s1-8", rent=40, ram=8, cpu=2, storage=40
|
|
).exists(),
|
|
True,
|
|
)
|
|
|
|
|
|
class CreateInstance(TestCase):
|
|
def setUp(self):
|
|
register_util(t=self, username="createinstance_user")
|
|
create_configurations(t=self)
|
|
|
|
|
|
def test_sanitize_vm_name(self):
|
|
self.assertEqual(sanitize_vm_name(vm_name="LOWERname"), "lowername")
|
|
|
|
with self.assertRaises(VmException):
|
|
sanitize_vm_name(vm_name="12345452131324234234234234")
|
|
|
|
with self.assertRaises(VmException):
|
|
sanitize_vm_name(vm_name="122342$#34234")
|
|
|
|
|
|
@override_settings(
|
|
HOSTEA=infra_custom_config(test_name="test_create_instance_util")
|
|
)
|
|
def test_create_instance_util(self):
|
|
configuration = self.instance_config[0].name
|
|
|
|
with self.assertRaises(VmException):
|
|
create_instance(
|
|
vm_name="12345452131324234234234234",
|
|
configuration_name=configuration,
|
|
user=self.user,
|
|
)
|
|
|
|
@override_settings(
|
|
HOSTEA=infra_custom_config(test_name="test_create_instance_renders")
|
|
)
|
|
def test_create_instance_renders(self):
|
|
c = Client()
|
|
login_util(self, c, "accounts.home")
|
|
urls = [(reverse("dash.instances.new"), "Instance Configuration")]
|
|
for (url, test) in urls:
|
|
print(f"[*] Testing URI: {url}")
|
|
resp = c.get(url)
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertEqual(b"Billing" in resp.content, True)
|
|
self.assertEqual(b"Support" in resp.content, True)
|
|
self.assertEqual(b"Logout" in resp.content, True)
|
|
self.assertEqual(str.encode(test) in resp.content, True)
|
|
|
|
# create instance
|
|
instance_name = "testirenrs"
|
|
payload = {
|
|
"name": instance_name,
|
|
"configuration": self.instance_config[0].name,
|
|
}
|
|
|
|
self.assertEqual(Instance.objects.filter(name=payload["name"]).exists(), False)
|
|
|
|
create_instance_util(
|
|
t=self, c=c, instance_name=instance_name, config=self.instance_config[0]
|
|
)
|
|
|
|
self.assertEqual(
|
|
Instance.objects.filter(
|
|
name=instance_name,
|
|
owned_by=self.user,
|
|
configuration_id=self.instance_config[0],
|
|
).exists(),
|
|
True,
|
|
)
|
|
instance = Instance.objects.get(name=payload["name"], owned_by=self.user)
|
|
|
|
# try to create instance with duplicate name
|
|
resp = c.post(reverse("dash.instances.new"), payload)
|
|
self.assertEqual(resp.status_code, 400)
|
|
self.assertEqual(b"Instance name exists" in resp.content, True)
|
|
|
|
# try to create instance with a VM configuration that doesn't exist
|
|
payload = {
|
|
"name": f"2{payload['name']}",
|
|
"configuration": f"2{payload['name']}",
|
|
}
|
|
resp = c.post(reverse("dash.instances.new"), payload)
|
|
self.assertEqual(resp.status_code, 400)
|
|
self.assertEqual(b"Configuration doesn" in resp.content, True)
|
|
|
|
# list instances
|
|
resp = c.get(reverse("dash.instances.list"))
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertEqual(str.encode(instance.name) in resp.content, True)
|
|
|
|
# view instance details
|
|
resp = c.get(reverse("dash.instances.view", args=(instance.name,)))
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertEqual(str.encode(instance.name) in resp.content, True)
|
|
|
|
# delete instance details
|
|
delete_uri = reverse("dash.instances.delete", args=(instance.name,))
|
|
|
|
## will ask for sudo confirmation
|
|
resp = c.get(delete_uri)
|
|
self.assertEqual(resp.status_code, 302)
|
|
ctx = {"next": delete_uri}
|
|
self.assertEqual(
|
|
resp.headers["location"], f"{reverse('accounts.sudo')}?{urlencode(ctx)}"
|
|
)
|
|
|
|
## give sudo confirmation
|
|
payload = {"password": self.password, "next": ctx["next"]}
|
|
resp = c.post(reverse("accounts.sudo"), payload)
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], ctx["next"])
|
|
|
|
resp = c.get(delete_uri)
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertEqual(
|
|
str.encode(f"to delete VM {instance.name}") in resp.content, True
|
|
)
|
|
|
|
resp = c.post(delete_uri)
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(
|
|
resp.headers["location"], reverse("infra.rm", args=(instance.name,))
|
|
)
|
|
resp = c.get(resp.headers["location"])
|
|
self.assertEqual(resp.status_code, 302)
|
|
self.assertEqual(resp.headers["location"], reverse("dash.instances.list"))
|
|
self.assertEqual(
|
|
Instance.objects.filter(
|
|
name=instance_name,
|
|
owned_by=self.user,
|
|
).exists(),
|
|
False,
|
|
)
|