feat: integration testing

SUMMARY
    integration/__main__.py is a CLI-based HTTP client that can interact
    with Hostea Dashboard and Gitea.

    Integration tests are run via integration/tests.sh, which is a
    driver for the HTTP client at integration/__main__.py. The script is
    capable of spinning up a test environment consisting of services
    defined in docker-compose-dev-deps.yml and the Hostea Dashboard and
    tearing it down after a successful run.

    The credentials used to create various accounts and other parameters
    are all defined in integration/tests.sh script it self. So it is
    self contained.

CLIENT FUNCTIONALITY:

    HOSTEA DASHBOARD:
	 - register user with email verification
	 - login
	 - create OIDC app
	 - visit support page

    GITEA:
	 - Install Gitea(DB configuration, etc. The first form that's
	 presented to the visitor after a new instance is deployed)
	 - Register User
	 - Login User
	 - Create repository
	 - Configure OIDC SSO
	 - Login via SSO
wip-infra-tests
Aravinth Manivannan 2022-06-23 20:59:22 +05:30
parent 3fb756bd12
commit 798a2f03d9
Signed by: realaravinth
GPG Key ID: AD9F0F08E855ED88
9 changed files with 1065 additions and 276 deletions

View File

@ -32,6 +32,9 @@ freeze: ## Freeze python dependencies
help: ## Prints help for targets with comments
@cat $(MAKEFILE_LIST) | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
integration-test: ## run integration tests
. ./venv/bin/activate && integration/tests.sh
lint: ## Run linter
@./venv/bin/black ./dashboard/
@./venv/bin/black ./accounts/

0
integration/__init__.py Normal file
View File

13
integration/__main__.py Normal file
View File

@ -0,0 +1,13 @@
import argparse
from .cli import Cli
def admin(args):
print(args)
if __name__ == "__main__":
cli = Cli()
opts = cli.parse()
opts.func(opts, c=cli.c)

284
integration/cli.py Normal file
View File

@ -0,0 +1,284 @@
import argparse
from requests import Session
def gitea_from_args(args, c: Session):
from .gitea import Gitea
return Gitea(
host=args.host,
username=args.username,
password=args.password,
email=args.email,
c=c,
)
class Gitea:
def __init__(self, parser, c: Session):
self.c = c
self.parser = parser
self.subparser = self.parser.add_subparsers()
self.install()
self.register()
self.login()
self.create_repository()
self.install_sso()
def __add_credentials_parser(self, parser):
group = parser.add_argument_group("credentials", "User credentials")
group.add_argument("username", type=str, help="Gitea user's username")
group.add_argument("password", type=str, help="Gitea user's password")
group.add_argument("email", type=str, help="Gitea user's email")
group.add_argument("host", type=str, help="URI at which Gitea is running")
def install(self):
def run(args, c: Session):
gitea = gitea_from_args(args, c=c)
gitea.install()
self.install_parser = self.subparser.add_parser(
name="install", description="Install Gitea", help="Install Gitea"
)
self.__add_credentials_parser(self.install_parser)
self.install_parser.set_defaults(func=run)
def register(self):
def run(args, c: Session):
gitea = gitea_from_args(args, c=c)
gitea.register()
self.register_parser = self.subparser.add_parser(
name="register",
description="Gitea user registration",
help="Register a user on Gitea",
)
self.__add_credentials_parser(self.register_parser)
self.register_parser.set_defaults(func=run)
def login(self):
def run(args, c: Session):
gitea = gitea_from_args(args, c=c)
gitea.login()
self.login_parser = self.subparser.add_parser(
name="login", description="Gitea user login", help="Login on Gitea"
)
self.__add_credentials_parser(self.login_parser)
self.login_parser.set_defaults(func=run)
def create_repository(self):
def run(args, c: Session):
gitea = gitea_from_args(args, c=c)
gitea.login()
gitea.create_repository(name=args.repo_name)
self.create_repository_parser = self.subparser.add_parser(
name="create_repo",
description="Create repository on Gitea",
help="Create repository on Gitea",
)
self.__add_credentials_parser(self.create_repository_parser)
self.create_repository_parser.set_defaults(func=run)
self.create_repository_parser.add_argument(
"repo_name", type=str, help="Name of the repository to be created"
)
def install_sso(self):
def run(args, c: Session):
gitea = gitea_from_args(args, c=c)
gitea.login()
print(f"CLIENT ID: {args.client_id}")
gitea.install_sso(
sso_name=args.sso_name,
client_id=args.client_id,
client_secret=args.client_secret,
sso_auto_discovery_url=args.sso_auto_discovery_url,
)
self.install_sso_parser = self.subparser.add_parser(
name="install_sso",
description="Install SSO on Gitea",
help="Install SSO on Gitea",
)
self.__add_credentials_parser(self.install_sso_parser)
self.install_sso_parser.add_argument(
"sso_name", type=str, help="(Human readable)Name of the SSO"
)
self.install_sso_parser.add_argument(
"client_id", type=str, help="Client ID generated by the SSO"
)
self.install_sso_parser.add_argument(
"client_secret", type=str, help="Client secret generated by the SSO"
)
self.install_sso_parser.add_argument(
"sso_auto_discovery_url",
type=str,
help="OIDC Auto Discovery URL of the SSO",
)
self.install_sso_parser.set_defaults(func=run)
def dash_from_args(args, c: Session):
from .hostea import Hostea
return Hostea(
username=args.username,
email=args.email,
password=args.password,
host=args.host,
c=c,
)
class Hostea:
def __init__(self, parser, c: Session):
self.c = c
self.parser = parser
self.subparser = self.parser.add_subparsers()
self.register()
self.login()
self.support()
def __add_credentials_parser(self, parser):
group = parser.add_argument_group("credentials", "User credentials")
group.add_argument("username", type=str, help="Hostea user's username")
group.add_argument("password", type=str, help="Hostea user's password")
group.add_argument("email", type=str, help="Hostea user's email")
group.add_argument("host", type=str, help="URI at which Hostea is running")
def register(self):
def run(args, c: Session):
dash = dash_from_args(args, c=c)
dash.register(maildev_host=args.maildev_host)
self.register_parser = self.subparser.add_parser(
name="register",
description="register new user ",
help="Register new user on Hostea Dashboard",
)
self.__add_credentials_parser(self.register_parser)
self.register_parser.add_argument(
"maildev_host", type=str, help="URI at which maildev is running"
)
self.register_parser.set_defaults(func=run)
def login(self):
def run(args, c: Session):
dash = dash_from_args(args, c=c)
dash.login()
self.login_parser = self.subparser.add_parser(
name="login",
description="login",
help="Login user on Hostea Dashboard",
)
self.__add_credentials_parser(self.login_parser)
self.login_parser.set_defaults(func=run)
def support(self):
def run(args, c: Session):
from .gitea import GiteaSSO
dash = dash_from_args(args, c=c)
dash.login()
gitea = GiteaSSO(
username=dash.username,
email=dash.email,
gitea_host=args.gitea_host,
hostea_org=args.gitea_hostea_org,
support_repo=args.support_repo,
c=c,
)
dash.new_ticket(gitea.new_issues_uri)
gitea.new_issue()
self.support_parser = self.subparser.add_parser(
name="support",
description="Hostea support",
help="Support functionality on Hostea Dashboard",
)
self.__add_credentials_parser(self.support_parser)
self.support_parser.add_argument(
"gitea_host", type=str, help="URI at which Gitea is running"
)
self.support_parser.add_argument(
"gitea_hostea_org",
type=str,
help="Hostea namespace(username/org) on Gitea, where support repository is hosted",
)
self.support_parser.add_argument(
"support_repo", type=str, help="support repository name"
)
self.support_parser.set_defaults(func=run)
class Cli:
def __init__(self):
c = Session()
self.c = c
self.parser = argparse.ArgumentParser(
description="Install and Bootstrap Gitea and Hostea Dashboard"
)
self.subparser = self.parser.add_subparsers()
self.check_env()
self.gitea()
self.hostea()
def __add_credentials_parser(self, parser):
group = parser.add_argument_group("credentials", "User credentials")
group.add_argument("username", type=str, help="Gitea user's username")
group.add_argument("password", type=str, help="Gitea user's password")
group.add_argument("email", type=str, help="Gitea user's email")
def check_env(self):
def run(args, c: Session):
from .gitea import Gitea
from .hostea import Hostea
Hostea.check_online(
dashboard_host=args.hostea_host, maildev_host=args.maildev_host
)
Gitea.check_online(host=args.gitea_host)
self.check_env_parser = self.subparser.add_parser(
name="check_env",
description="Check and block until environment is ready",
help="Check and block until environment is ready",
)
self.check_env_parser.add_argument(
"gitea_host", type=str, help="URI at which Gitea is running"
)
self.check_env_parser.add_argument(
"hostea_host", type=str, help="URI at which Hostea is running"
)
self.check_env_parser.add_argument(
"maildev_host", type=str, help="URI at which maildev is running"
)
self.check_env_parser.set_defaults(func=run)
def hostea(self):
self.hostea = self.subparser.add_parser(
name="hostea",
description="Hostea Dashboard",
help="Hostea Dashboard-related functionality",
)
Hostea(parser=self.hostea, c=self.c)
def gitea(self):
self.gitea = self.subparser.add_parser(
name="gitea",
description="Gitea",
help="Gitea-related functionality",
)
Gitea(parser=self.gitea, c=self.c)
def parse(self):
return self.parser.parse_args()

38
integration/csrf.py Normal file
View File

@ -0,0 +1,38 @@
from html.parser import HTMLParser
class ParseCSRF(HTMLParser):
token: str = None
def __init__(self, name):
HTMLParser.__init__(self)
self.name = name
# @classmethod
# def dashboard_parser(cls) -> "ParseCSRF":
# return cls(name="csrfmiddlewaretoken")
#
# @classmethod
# def gitea_parser(cls) -> "ParseCSRF":
# return cls(name="_csrf")
#
def handle_starttag(self, tag: str, attrs: (str, str)):
if self.token:
return
if tag != "input":
return
token = None
for (index, (k, v)) in enumerate(attrs):
if k == "value":
token = v
if all([k == "name", v == self.name]):
if token:
self.token = token
return
for (inner_index, (nk, nv)) in enumerate(attrs, start=index):
if nk == "value":
self.token = nv
return

449
integration/gitea.py Executable file
View File

@ -0,0 +1,449 @@
from urllib.parse import urlunparse, urlparse
from html.parser import HTMLParser
from time import sleep
import random
import argparse
from requests import Session
from requests.auth import HTTPBasicAuth
import requests
from .csrf import ParseCSRF
# GITEA_USER = "root"
# GITEA_EMAIL = "root@example.com"
# GITEA_PASSWORD = "foobarpassword"
# HOST = "http://localhost:8080"
#
# REPOS = []
class Gitea:
def __init__(self, host: str, username: str, password: str, email: str, c: Session):
self.host = host
self.username = username
self.password = password
self.email = email
self.c = c
self.__csrf_key = "_csrf"
self.__logged_in = False
def get_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse((parsed.scheme, parsed.netloc, path, "", "", ""))
def get_api_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse(
(
parsed.scheme,
f"{self.username}:{self.password}@{parsed.netloc}",
path,
"",
"",
"",
)
)
@staticmethod
def check_online(host: str):
"""
Check if Gitea instance is online
"""
count = 0
parsed = urlparse(host)
url = urlunparse((parsed.scheme, parsed.netloc, "api/v1/nodeinfo", "", "", ""))
while True:
try:
res = requests.get(url, allow_redirects=False)
if any([res.status_code == 302, res.status_code == 200]):
break
except:
sleep(2)
print(f"Retrying {count} time")
count += 1
continue
def install(self):
"""
Install Gitea, first form that a user sees when a new instance is
deployed
"""
payload = {
"db_type": "sqlite3",
"db_host": "localhost:3306",
"db_user": "root",
"db_passwd": "",
"db_name": "gitea",
"ssl_mode": "disable",
"db_schema": "",
"charset": "utf8",
"db_path": "/data/gitea/gitea.db",
"app_name": "Gitea:+Git+with+a+cup+of+tea",
"repo_root_path": "/data/git/repositories",
"lfs_root_path": "/data/git/lfs",
"run_user": "git",
"domain": "localhost",
"ssh_port": "2221",
"http_port": "3000",
"app_url": self.get_uri(""),
"log_root_path": "/data/gitea/log",
"smtp_host": "",
"smtp_from": "",
"smtp_user": "",
"smtp_passwd": "",
"enable_federated_avatar": "on",
"enable_open_id_sign_in": "on",
"enable_open_id_sign_up": "on",
"default_allow_create_organization": "on",
"default_enable_timetracking": "on",
"no_reply_address": "noreply.localhost",
"password_algorithm": "pbkdf2",
"admin_name": "",
"admin_passwd": "",
"admin_confirm_passwd": "",
"admin_email": "",
}
self.c.post(self.get_uri(""), data=payload)
sleep(10)
def get_csrf_token(self, url: str) -> str:
"""
Get CSRF token at a URI
"""
resp = self.c.get(url, allow_redirects=False)
if resp.status_code != 200 and resp.status_code != 302:
print(resp.status_code, resp.text)
raise Exception(f"Can't get csrf token: {resp.status_code}")
parser = ParseCSRF(name=self.__csrf_key)
parser.feed(resp.text)
csrf = parser.token
return csrf
def register(self):
"""
Register User
"""
url = self.get_uri("/user/sign_up")
csrf = self.get_csrf_token(url)
payload = {
"_csrf": csrf,
"user_name": self.username,
"password": self.password,
"retype": self.password,
"email": self.email,
}
self.c.post(url, data=payload, allow_redirects=False)
def login(self):
"""
Login, must be called at least once before performing authenticated
operations
"""
if self.__logged_in:
return
url = self.get_uri("/user/login")
csrf = self.get_csrf_token(url)
payload = {
"_csrf": csrf,
"user_name": self.username,
"password": self.password,
"remember": "on",
}
resp = self.c.post(url, data=payload, allow_redirects=False)
if any(
[resp.status_code == 302, resp.status_code == 200, resp.status_code == 303]
):
print("User logged in")
self.__logged_in = True
return
raise Exception(
f"[ERROR] Authentication failed. status code {resp.status_code}"
)
def create_repository(self, name: str):
"""
Create repository
"""
self.login()
def get_repository_payload(csrf: str, name: str, user_id: str):
data = {
"_csrf": csrf,
"uid": user_id,
"repo_name": name,
"description": f"this repository is named {name}",
"repo_template": "",
"issue_labels": "",
"gitignores": "",
"license": "",
"readme": "Default",
"default_branch": "master",
"trust_model": "default",
}
return data
url = self.get_uri("/repo/create")
user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"]
print(type(user_id))
print(user_id)
csrf = self.get_csrf_token(url)
data = get_repository_payload(csrf, name, user_id=user_id)
print(data)
print(type(data))
resp = self.c.post(url, data=data, allow_redirects=False)
print(f"Created repository {name}")
if resp.status_code != 302 and resp.status_code != 200:
raise Exception(
f"Error while creating repository: {name} {resp.status_code}"
)
def install_sso(
self,
sso_name: str,
client_id: str,
client_secret: str,
sso_auto_discovery_url: str,
):
self.login()
"""
Install SSO.
- sso_name: human readable SSO name. Doesn't have any functionality
except assisting admins ID an SSO by a name
- client_id: OAuth stuff, will be generated by the SSO that should be integrated
- client_secret: OAuth stuff, will be generated by the SSO that should be integrated
- sso_auto_discovery_url: Again OAuth stuff. Should be available in the SSO documentation.
In Hostea SSO's case, it is available at
https://hostea-dash.example.org/o/.well-known/openid-configuration/
"""
csrf = self.get_csrf_token(self.get_uri("/admin/auths/new"))
payload = {
"_autofill_dummy_username": "",
"_autofill_dummy_password": "",
"_csrf": csrf,
"type": "6",
"name": sso_name,
"security_protocol": "",
"host": "",
"port": "",
"bind_dn": "",
"bind_password": "",
"user_base": "",
"user_dn": "",
"filter": "",
"admin_filter": ["", ""],
"attribute_username": "",
"attribute_name": "",
"attribute_surname": "",
"attribute_mail": "",
"attribute_ssh_public_key": "",
"attribute_avatar": "",
"group_dn": "",
"group_member_uid": "",
"user_uid": "",
"group_filter": "",
"group_team_map": "",
"search_page_size": "",
"smtp_auth": "PLAIN",
"smtp_host": "",
"smtp_port": "",
"helo_hostname": "",
"allowed_domains": "",
"pam_service_name": "",
"pam_email_domain": "",
"oauth2_provider": "openidConnect",
"oauth2_key": client_id,
"oauth2_secret": client_secret,
"oauth2_icon_url": "",
"open_id_connect_auto_discovery_url": sso_auto_discovery_url,
"oauth2_auth_url": "",
"oauth2_token_url": "",
"oauth2_profile_url": "",
"oauth2_email_url": "",
"oauth2_tenant": "",
"oauth2_scopes": "openid",
"oauth2_required_claim_name": "",
"oauth2_required_claim_value": "",
"oauth2_group_claim_name": "",
"oauth2_admin_group": "",
"oauth2_restricted_group": "",
"sspi_auto_create_users": "on",
"sspi_auto_activate_users": "on",
"sspi_strip_domain_names": "on",
"sspi_separator_replacement": "_",
"sspi_default_language": "",
"is_sync_enabled": "on",
"is_active": "on",
}
resp = self.c.post(self.get_uri("/admin/auths/new"), data=payload)
def cli():
parser = argparse.ArguementParser(description="Install and Bootstrap Gitea")
parser.add_arguement("username", type=str, help="Gitea user's username")
parser.add_arguement("password", type=str, help="Gitea user's password")
parser.add_arguement("email", type=str, help="Gitea user's email")
args = parser.parse_args()
if __name__ == "__main__":
print("hello")
# args = cli()
# gitea = Gitea(
# host=HOST,
# username=GITEA_USER,
# password=GITEA_PASSWORD,
# email=GITEA_EMAIL,
# c=Session(),
# )
# gitea.check_online()
# print("Instace online")
# gitea.install()
# gitea.register()
# gitea.login()
# gitea.create_repository(name="support")
# client_id = ""
# client_secret = ""
# sso_auto_discovery_url = ""
# gitea.install_sso(
# sso_name="Hostea OIDC",
# client_id=client_id,
# client_secret=client_secret,
# sso_auto_discovery_url=sso_auto_discovery_url,
# )
class ParseSSOLogin(HTMLParser):
url: str = None
def handle_starttag(self, tag: str, attrs: (str, str)):
if self.url:
return
if tag != "a":
return
token = None
for (index, (k, v)) in enumerate(attrs):
if k == "href":
if "/user/oauth2/" in v:
self.url = v
return
class GiteaSSO:
def __init__(
self,
username: str,
email: str,
gitea_host: str,
hostea_org: str,
support_repo: str,
c: Session,
):
self.c = c
self.username = username
self.gitea_host = gitea_host
self.hostea_org = hostea_org
self.support_repo = support_repo
self.email = email
self.__csrf_key = "_csrf"
url = urlparse(self.gitea_host)
repo = f"{self.hostea_org}/{self.support_repo}"
issues = f"{repo}/issues"
new_issues = f"{issues}/new"
self.__partial_call_back_url = urlunparse(
(url.scheme, url.netloc, "/user/oauth2/", "", "", "")
)
self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", ""))
self.__link_acount = urlunparse(
(url.scheme, url.netloc, "/user/link_account/", "", "", "")
)
self.__link_acount_signup = urlunparse(
(url.scheme, url.netloc, "/user/link_account_signup/", "", "", "")
)
self.__me = urlunparse(
(url.scheme, url.netloc, f"/{self.username}", "", "", "")
)
self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", ""))
self.new_issues_uri = urlunparse(
(url.scheme, url.netloc, new_issues, "", "", "")
)
def get_csrf(self, url: str) -> str:
resp = self.c.get(url)
parser = ParseCSRF(name=self.__csrf_key)
parser.feed(resp.text)
return parser.token
def _sso_login(self):
resp = self.c.get(self.__login)
parser = ParseSSOLogin()
parser.feed(resp.text)
url = urlparse(self.gitea_host)
## SSO URL in Gitea login page
sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", ""))
# redirects are enabled to for a cleaner implementation. Commented out
# code below does the same in a step-by-step manner
resp = self.c.get(sso)
# resp = c.get(sso, allow_redirects=False)
# ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization
# assert resp.status_code == 307
# resp = c.get(resp.headers["Location"], allow_redirects=False)
# assert self.__partial_call_back_url in resp.headers["Location"]
# resp = c.get(resp.headers["Location"], allow_redirects=False)
# assert resp.status_code == 303
# assert resp.headers["Location"] in self.__link_acount
# to register account, the user has to visit form at self.__link_acount
csrf = self.get_csrf(self.__link_acount)
# which makes a POST request to self.__link_acount_signup
# weird, but have to go to above URL to collect CSRF toekn
payload = {
"user_name": self.username,
"email": self.email,
self.__csrf_key: csrf,
}
resp = self.c.post(self.__link_acount_signup, payload, allow_redirects=True)
assert resp.status_code == 200
# redirect mechanisms seems to change for every version
# print(resp.status_code)
# print(resp.headers["Location"])
# assert resp.status_code == 303
# assert resp.headers["Location"] == self.new_issues_uri
resp = self.c.get(self.__me)
assert resp.status_code == 200
assert self.username in resp.text
def new_issue(self):
resp = self.c.get(self.new_issues_uri, allow_redirects=False)
resp.status_code = 303
assert "/user/login" in resp.headers["Location"]
self._sso_login()
resp = self.c.get(self.new_issues_uri, allow_redirects=False)
assert resp.status_code == 200

129
integration/hostea.py Normal file
View File

@ -0,0 +1,129 @@
import logging
from urllib.parse import urlparse, urlunparse
from html.parser import HTMLParser
from time import sleep
from requests import Session
import requests
from .csrf import ParseCSRF
class Hostea:
def __init__(self, username: str, email: str, password: str, host: str, c: Session):
self.username = username
self.email = email
self.password = password
self.csrf_key = "csrfmiddlewaretoken"
self.host = host
self.c = c
@staticmethod
def check_online(dashboard_host: str, maildev_host: str):
"""
Check if Hostea Dashboard is online
"""
count = 0
dash_parsed = urlparse(dashboard_host)
maildev_parsed = urlparse(maildev_host)
urls = [
urlunparse((dash_parsed.scheme, dash_parsed.netloc, "/login/", "", "", "")),
urlunparse((maildev_parsed.scheme, maildev_parsed.netloc, "", "", "", "")),
]
for url in urls:
while True:
try:
res = requests.get(url, allow_redirects=False)
if any([res.status_code == 302, res.status_code == 200]):
break
except Exception as e:
sleep(2)
print(e)
print(f"[Hostea] Retrying {count} time for {url}")
count += 1
continue
def get_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse((parsed.scheme, parsed.netloc, path, "", "", ""))
def get_csrf(self, url: str) -> str:
resp = self.c.get(url=url)
assert resp.status_code == 200
parser = ParseCSRF(name=self.csrf_key)
parser.feed(resp.text)
csrf = parser.token
return csrf
def __get_verification_link(self, maildev_host: str):
def maildev_uri(maildev_host: str, path: str):
parsed = urlparse(maildev_host)
return urlunparse((parsed.scheme, parsed.netloc, path, "", "", ""))
resp = self.c.get(maildev_uri(maildev_host=maildev_host, path="/email/"))
# resp = self.c.get("http://localhost:1080/email/")
emails = resp.json()
for email in emails:
if email["to"][0]["address"] == self.email:
logging.info("[Dashboard] Found verification link")
resp = self.c.delete(
maildev_uri(maildev_host=maildev_host, path=f"/email/{email['id']}")
)
return str.strip(email["text"].split("\n")[1])
logging.critical("[Dashboard] Verification link not found")
def register(self, maildev_host: str):
url = self.get_uri("/register/")
csrf = self.get_csrf(url)
payload = {
"username": self.username,
"password": self.password,
"email": self.email,
"confirm_password": self.password,
self.csrf_key: csrf,
}
logging.info("Registering user")
resp = self.c.post(url, payload, allow_redirects=False)
assert resp.status_code == 302
assert "pending" in resp.headers["Location"]
email_verification_link = self.__get_verification_link(
maildev_host=maildev_host
)
csrf = self.get_csrf(email_verification_link)
payload = {
self.csrf_key: csrf,
}
resp = self.c.post(email_verification_link, payload, allow_redirects=False)
assert resp.status_code == 302
assert resp.headers["Location"] == "/login/"
logging.info("[Dashboard] Email verified user")
def login(self):
url = self.get_uri("/login/")
csrf = self.get_csrf(url)
payload = {
"login": self.username,
"password": self.password,
self.csrf_key: csrf,
}
logging.info("Logging In user")
resp = self.c.post(url, payload, allow_redirects=False)
assert resp.status_code == 302
assert resp.headers["Location"] == "/"
resp = self.c.get(self.get_uri("/support/new/"))
assert resp.status_code == 200
def new_ticket(self, support_repository_new_issue: str):
resp = self.c.get(self.get_uri("/support/new/"))
# print(resp.text)
# print(support_repository_new_issue)
# assert support_repository_new_issue in resp.text

View File

@ -1,276 +0,0 @@
import logging
from urllib.parse import urlparse, urlunparse
from html.parser import HTMLParser
from requests import Session
logging.basicConfig(level=logging.INFO)
c = Session()
class ParseCSRF(HTMLParser):
token: str = None
def __init__(self, name):
HTMLParser.__init__(self)
self.name = name
@classmethod
def dashboard_parser(cls) -> "ParseCSRF":
return cls(name="csrfmiddlewaretoken")
@classmethod
def gitea_parser(cls) -> "ParseCSRF":
return cls(name="_csrf")
def handle_starttag(self, tag: str, attrs: (str, str)):
if self.token:
return
if tag != "input":
return
token = None
for (index, (k, v)) in enumerate(attrs):
if k == "value":
token = v
if all([k == "name", v == self.name]):
if token:
self.token = token
return
for (inner_index, (nk, nv)) in enumerate(attrs, start=index):
if nk == "value":
self.token = nv
return
class ParseSSOLogin(HTMLParser):
url: str = None
def handle_starttag(self, tag: str, attrs: (str, str)):
if self.url:
return
if tag != "a":
return
token = None
for (index, (k, v)) in enumerate(attrs):
if k == "href":
if "/user/oauth2/" in v:
self.url = v
return
class Hostea:
def __init__(self, username: str, email: str, password: str, url: str):
self.username = username
self.email = email
self.password = password
self.url = urlparse(url)
self.csrf_key = "csrfmiddlewaretoken"
def get_csrf(self, url: str) -> str:
resp = c.get(url)
assert resp.status_code == 200
parser = ParseCSRF(name=self.csrf_key)
parser.feed(resp.text)
csrf = parser.token
return csrf
def __get_verification_link(self):
resp = c.get("http://localhost:1080/email/")
emails = resp.json()
for email in emails:
if email["to"][0]["address"] == self.email:
logging.info("[Dashboard] Found verification link")
resp = c.delete(f"http://localhost:1080/email/{email['id']}")
return str.strip(email["text"].split("\n")[1])
logging.critical("[Dashboard] Verification link not found")
def register(self):
url = urlunparse((self.url.scheme, self.url.netloc, "/register/", "", "", ""))
csrf = self.get_csrf(url)
payload = {
"username": self.username,
"password": self.password,
"email": self.email,
"confirm_password": self.password,
self.csrf_key: csrf,
}
logging.info("Registering user")
resp = c.post(url, payload, allow_redirects=False)
assert resp.status_code == 302
assert "pending" in resp.headers["Location"]
email_verification_link = self.__get_verification_link()
csrf = self.get_csrf(email_verification_link)
payload = {
self.csrf_key: csrf,
}
resp = c.post(email_verification_link, payload, allow_redirects=False)
assert resp.status_code == 302
assert resp.headers["Location"] == "/login/"
logging.info("[Dashboard] Email verified user")
def login(self):
url = urlunparse((self.url.scheme, self.url.netloc, "/login/", "", "", ""))
csrf = self.get_csrf(url)
payload = {
"login": self.username,
"password": self.password,
self.csrf_key: csrf,
}
logging.info("Logging In user")
resp = c.post(url, payload, allow_redirects=False)
assert resp.status_code == 302
assert resp.headers["Location"] == "/"
url = urlunparse(
(self.url.scheme, self.url.netloc, "/support/new/", "", "", "")
)
resp = c.get(url)
assert resp.status_code == 200
def new_ticket(self, support_repository_new_issue: str):
url = urlunparse(
(self.url.scheme, self.url.netloc, "/support/new/", "", "", "")
)
resp = c.get(url)
assert support_repository_new_issue in resp.text
class Gitea:
def __init__(
self,
username: str,
email: str,
gitea_host: str,
hostea_org: str,
support_repo: str,
):
self.username = username
self.gitea_host = gitea_host
self.hostea_org = hostea_org
self.support_repo = support_repo
self.email = email
self.__csrf_key = "_csrf"
url = urlparse(self.gitea_host)
repo = f"{self.hostea_org}/{self.support_repo}"
issues = f"{repo}/issues"
new_issues = f"{issues}/new"
self.__partial_call_back_url = urlunparse(
(url.scheme, url.netloc, "/user/oauth2/", "", "", "")
)
self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", ""))
self.__link_acount = urlunparse(
(url.scheme, url.netloc, "/user/link_account/", "", "", "")
)
self.__link_acount_signup = urlunparse(
(url.scheme, url.netloc, "/user/link_account_signup/", "", "", "")
)
self.__me = urlunparse(
(url.scheme, url.netloc, f"/{self.username}", "", "", "")
)
self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", ""))
self.new_issues_uri = urlunparse(
(url.scheme, url.netloc, new_issues, "", "", "")
)
def get_csrf(self, url: str) -> str:
resp = c.get(url)
parser = ParseCSRF.gitea_parser()
parser.feed(resp.text)
return parser.token
def _sso_login(self):
resp = c.get(self.__login)
parser = ParseSSOLogin()
parser.feed(resp.text)
url = urlparse(self.gitea_host)
## SSO URL in Gitea login page
sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", ""))
# redirects are enabled to for a cleaner implementation. Commented out
# code below does the same in a step-by-step manner
resp = c.get(sso)
# resp = c.get(sso, allow_redirects=False)
# ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization
# assert resp.status_code == 307
# resp = c.get(resp.headers["Location"], allow_redirects=False)
# assert self.__partial_call_back_url in resp.headers["Location"]
# resp = c.get(resp.headers["Location"], allow_redirects=False)
# assert resp.status_code == 303
# assert resp.headers["Location"] in self.__link_acount
# to register account, the user has to visit form at self.__link_acount
csrf = self.get_csrf(self.__link_acount)
# which makes a POST request to self.__link_acount_signup
# weird, but have to go to above URL to collect CSRF toekn
payload = {
"user_name": self.username,
"email": self.email,
self.__csrf_key: csrf,
}
resp = c.post(self.__link_acount_signup, payload, allow_redirects=False)
assert resp.status_code == 303
assert resp.headers["Location"] == "/Hostea/support/issues/new"
resp = c.get(self.__me)
assert resp.status_code == 200
assert self.username in resp.text
def new_issue(self):
resp = c.get(self.new_issues_uri, allow_redirects=False)
resp.status_code = 303
assert "/user/login" in resp.headers["Location"]
self._sso_login()
resp = c.get(self.new_issues_uri, allow_redirects=False)
assert resp.status_code == 200
def main():
dash = Hostea(
username="enough",
email="enough@example.org",
password="asdfas234234vaa",
url="http://localhost:8000",
)
gitea = Gitea(
gitea_host="http://localhost:8080",
username=dash.username,
email=dash.email,
hostea_org="Hostea",
support_repo="support",
)
dash.register()
dash.login()
dash.new_ticket(gitea.new_issues_uri)
gitea.new_issue()
logging.info("All tests passed")
if __name__ == "__main__":
main()

149
integration/tests.sh Executable file
View File

@ -0,0 +1,149 @@
#!/bin/bash
set -Exeuo pipefail
readonly DASHBOARD_URL="http://localhost:8000"
readonly GITEA_URL="http://localhost:3000"
readonly MAILDEV_URL="http://localhost:1080"
readonly DASHBOARD_OIDC_DISCOVERY_URL="$DASHBOARD_URL/o/.well-known/openid-configuration/"
readonly DASHBOARD_ADMIN_USERNAME=root
readonly DASHBOARD_ADMIN_PASSWORD=supercomplicatedpassword
readonly DASHBOARD_ADMIN_EMAIL="$DASHBOARD_ADMIN_USERNAME@dash.example.org"
readonly DASHBOARD_OIDC_APP_NAME=hostea-gitea
readonly GITEA_ROOT_USERNAME=root
readonly GITEA_ROOT_EMAIL="$GITEA_ROOT_USERNAME@example.org"
readonly GITEA_ROOT_PASSOWRD=supercomplicatedpassword
readonly GITEA_HOSTEA_SSO_NAME=hostea-sso
readonly GITEA_OIDC_CALLBACK="$GITEA_URL/user/oauth2/$GITEA_HOSTEA_SSO_NAME/callback"
readonly GITEA_HOSTEA_USERNAME=hostea
readonly GITEA_HOSTEA_PASSWORD=supercomplicatedpassword
readonly GITEA_HOSTEA_EMAIL="$GITEA_HOSTEA_USERNAME@example.org"
readonly GITEA_HOSTEA_SUPPORT_REPO="support"
readonly HOSTEA_CUSTOMER_USERNAME=batman
readonly HOSTEA_CUSTOMER_PASSWORD=supercomplicatedpassword
readonly HOSTEA_CUSTOMER_EMAIL="$HOSTEA_CUSTOMER_USERNAME@example.org"
OIDC_CLIENT_ID=""
OIDC_CLIENT_SECRET=""
wait_for_env() {
python -m integration \
check_env $GITEA_URL $DASHBOARD_URL $MAILDEV_URL
}
# create OIDC app on Hostea Dashboard
oidc_dashboard_init() {
python -m integration \
hostea register \
$DASHBOARD_ADMIN_USERNAME $DASHBOARD_ADMIN_PASSWORD \
$DASHBOARD_ADMIN_EMAIL \
$DASHBOARD_URL \
$MAILDEV_URL
resp=$(python manage.py create_oidc \
$DASHBOARD_OIDC_APP_NAME $DASHBOARD_ADMIN_USERNAME \
$GITEA_OIDC_CALLBACK)
OIDC_CLIENT_ID=$(echo $resp | cut -d ":" -f 2 | cut -d " " -f 2)
OIDC_CLIENT_SECRET=$(echo $resp | cut -d ":" -f 3 | cut -d " " -f 2)
}
# register root user on Gitea to simulate Hoste admin and integrate SSO
gitea_root(){
python -m integration \
gitea install \
$GITEA_ROOT_USERNAME $GITEA_ROOT_PASSOWRD \
$GITEA_ROOT_EMAIL \
$GITEA_URL
python -m integration \
gitea register \
$GITEA_ROOT_USERNAME $GITEA_ROOT_PASSOWRD \
$GITEA_ROOT_EMAIL \
$GITEA_URL
python -m integration \
gitea login \
$GITEA_ROOT_USERNAME $GITEA_ROOT_PASSOWRD \
$GITEA_ROOT_EMAIL \
$GITEA_URL
python -m integration \
gitea install_sso \
$GITEA_ROOT_USERNAME $GITEA_ROOT_PASSOWRD \
$GITEA_ROOT_EMAIL \
$GITEA_URL \
$GITEA_HOSTEA_SSO_NAME \
$OIDC_CLIENT_ID $OIDC_CLIENT_SECRET \
$DASHBOARD_OIDC_DISCOVERY_URL
}
# register user "Hostea" on Gitea and create support repository
support_repo_init() {
python -m integration \
gitea register \
$GITEA_HOSTEA_USERNAME $GITEA_HOSTEA_PASSWORD \
$GITEA_HOSTEA_EMAIL \
$GITEA_URL
python -m integration \
gitea login \
$GITEA_HOSTEA_USERNAME $GITEA_HOSTEA_PASSWORD \
$GITEA_HOSTEA_EMAIL \
$GITEA_URL
python -m integration \
gitea create_repo \
$GITEA_HOSTEA_USERNAME $GITEA_HOSTEA_PASSWORD \
$GITEA_HOSTEA_EMAIL \
$GITEA_URL \
$GITEA_HOSTEA_SUPPORT_REPO
}
# Create user on Hostea to simulate a Hostea customer
hostea_customer_simulation() {
python -m integration \
hostea register \
$HOSTEA_CUSTOMER_USERNAME $HOSTEA_CUSTOMER_PASSWORD \
$HOSTEA_CUSTOMER_EMAIL \
$DASHBOARD_URL \
$MAILDEV_URL
python -m integration \
hostea login \
$HOSTEA_CUSTOMER_USERNAME $HOSTEA_CUSTOMER_PASSWORD \
$HOSTEA_CUSTOMER_EMAIL $DASHBOARD_URL
python -m integration \
hostea support \
$HOSTEA_CUSTOMER_USERNAME $HOSTEA_CUSTOMER_PASSWORD \
$HOSTEA_CUSTOMER_EMAIL \
$DASHBOARD_URL \
$GITEA_URL \
$GITEA_HOSTEA_USERNAME $GITEA_HOSTEA_SUPPORT_REPO
}
SERVER_PID=""
setup_env() {
nohup python manage.py runserver > /dev/null 2>&1 &
SERVER_PID=$!
docker-compose -f docker-compose-dev-deps.yml up --detach
}
teardown_env() {
docker-compose -f docker-compose-dev-deps.yml down --remove-orphans
kill $SERVER_PID
}
main() {
teardown_env || true
setup_env
wait_for_env
oidc_dashboard_init
echo "OIDC APP initialized. CLIENT_ID: $OIDC_CLIENT_ID CLIENT SECRET: $OIDC_CLIENT_SECRET"
gitea_root
support_repo_init
hostea_customer_simulation
teardown_env
echo "All Good! :)"
}
main