From 798a2f03d97ccb0f237a2be9ef75118f9521e095 Mon Sep 17 00:00:00 2001 From: realaravinth Date: Thu, 23 Jun 2022 20:59:22 +0530 Subject: [PATCH] feat: integration testing SUMMARY integration/__main__.py is a CLI-based HTTP client that can interact with Hostea Dashboard and Gitea. Integration tests are run via integration/tests.sh, which is a driver for the HTTP client at integration/__main__.py. The script is capable of spinning up a test environment consisting of services defined in docker-compose-dev-deps.yml and the Hostea Dashboard and tearing it down after a successful run. The credentials used to create various accounts and other parameters are all defined in integration/tests.sh script it self. So it is self contained. CLIENT FUNCTIONALITY: HOSTEA DASHBOARD: - register user with email verification - login - create OIDC app - visit support page GITEA: - Install Gitea(DB configuration, etc. The first form that's presented to the visitor after a new instance is deployed) - Register User - Login User - Create repository - Configure OIDC SSO - Login via SSO --- Makefile | 3 + integration/__init__.py | 0 integration/__main__.py | 13 ++ integration/cli.py | 284 +++++++++++++++++++++++++ integration/csrf.py | 38 ++++ integration/gitea.py | 449 ++++++++++++++++++++++++++++++++++++++++ integration/hostea.py | 129 ++++++++++++ integration/tests.py | 276 ------------------------ integration/tests.sh | 149 +++++++++++++ 9 files changed, 1065 insertions(+), 276 deletions(-) create mode 100644 integration/__init__.py create mode 100644 integration/__main__.py create mode 100644 integration/cli.py create mode 100644 integration/csrf.py create mode 100755 integration/gitea.py create mode 100644 integration/hostea.py delete mode 100644 integration/tests.py create mode 100755 integration/tests.sh diff --git a/Makefile b/Makefile index 1dcabd5..4246fe6 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,9 @@ freeze: ## Freeze python dependencies help: ## Prints help for targets with comments @cat $(MAKEFILE_LIST) | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' +integration-test: ## run integration tests + . ./venv/bin/activate && integration/tests.sh + lint: ## Run linter @./venv/bin/black ./dashboard/ @./venv/bin/black ./accounts/ diff --git a/integration/__init__.py b/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/integration/__main__.py b/integration/__main__.py new file mode 100644 index 0000000..23bf4ea --- /dev/null +++ b/integration/__main__.py @@ -0,0 +1,13 @@ +import argparse + +from .cli import Cli + + +def admin(args): + print(args) + + +if __name__ == "__main__": + cli = Cli() + opts = cli.parse() + opts.func(opts, c=cli.c) diff --git a/integration/cli.py b/integration/cli.py new file mode 100644 index 0000000..7ca3516 --- /dev/null +++ b/integration/cli.py @@ -0,0 +1,284 @@ +import argparse + +from requests import Session + + +def gitea_from_args(args, c: Session): + from .gitea import Gitea + + return Gitea( + host=args.host, + username=args.username, + password=args.password, + email=args.email, + c=c, + ) + + +class Gitea: + def __init__(self, parser, c: Session): + self.c = c + self.parser = parser + self.subparser = self.parser.add_subparsers() + self.install() + self.register() + self.login() + self.create_repository() + self.install_sso() + + def __add_credentials_parser(self, parser): + group = parser.add_argument_group("credentials", "User credentials") + group.add_argument("username", type=str, help="Gitea user's username") + group.add_argument("password", type=str, help="Gitea user's password") + group.add_argument("email", type=str, help="Gitea user's email") + group.add_argument("host", type=str, help="URI at which Gitea is running") + + def install(self): + def run(args, c: Session): + gitea = gitea_from_args(args, c=c) + gitea.install() + + self.install_parser = self.subparser.add_parser( + name="install", description="Install Gitea", help="Install Gitea" + ) + self.__add_credentials_parser(self.install_parser) + self.install_parser.set_defaults(func=run) + + def register(self): + def run(args, c: Session): + gitea = gitea_from_args(args, c=c) + gitea.register() + + self.register_parser = self.subparser.add_parser( + name="register", + description="Gitea user registration", + help="Register a user on Gitea", + ) + self.__add_credentials_parser(self.register_parser) + self.register_parser.set_defaults(func=run) + + def login(self): + def run(args, c: Session): + gitea = gitea_from_args(args, c=c) + gitea.login() + + self.login_parser = self.subparser.add_parser( + name="login", description="Gitea user login", help="Login on Gitea" + ) + self.__add_credentials_parser(self.login_parser) + self.login_parser.set_defaults(func=run) + + def create_repository(self): + def run(args, c: Session): + gitea = gitea_from_args(args, c=c) + gitea.login() + gitea.create_repository(name=args.repo_name) + + self.create_repository_parser = self.subparser.add_parser( + name="create_repo", + description="Create repository on Gitea", + help="Create repository on Gitea", + ) + self.__add_credentials_parser(self.create_repository_parser) + self.create_repository_parser.set_defaults(func=run) + self.create_repository_parser.add_argument( + "repo_name", type=str, help="Name of the repository to be created" + ) + + def install_sso(self): + def run(args, c: Session): + gitea = gitea_from_args(args, c=c) + gitea.login() + print(f"CLIENT ID: {args.client_id}") + gitea.install_sso( + sso_name=args.sso_name, + client_id=args.client_id, + client_secret=args.client_secret, + sso_auto_discovery_url=args.sso_auto_discovery_url, + ) + + self.install_sso_parser = self.subparser.add_parser( + name="install_sso", + description="Install SSO on Gitea", + help="Install SSO on Gitea", + ) + self.__add_credentials_parser(self.install_sso_parser) + self.install_sso_parser.add_argument( + "sso_name", type=str, help="(Human readable)Name of the SSO" + ) + self.install_sso_parser.add_argument( + "client_id", type=str, help="Client ID generated by the SSO" + ) + self.install_sso_parser.add_argument( + "client_secret", type=str, help="Client secret generated by the SSO" + ) + self.install_sso_parser.add_argument( + "sso_auto_discovery_url", + type=str, + help="OIDC Auto Discovery URL of the SSO", + ) + + self.install_sso_parser.set_defaults(func=run) + + +def dash_from_args(args, c: Session): + from .hostea import Hostea + + return Hostea( + username=args.username, + email=args.email, + password=args.password, + host=args.host, + c=c, + ) + + +class Hostea: + def __init__(self, parser, c: Session): + self.c = c + self.parser = parser + self.subparser = self.parser.add_subparsers() + self.register() + self.login() + self.support() + + def __add_credentials_parser(self, parser): + group = parser.add_argument_group("credentials", "User credentials") + group.add_argument("username", type=str, help="Hostea user's username") + group.add_argument("password", type=str, help="Hostea user's password") + group.add_argument("email", type=str, help="Hostea user's email") + group.add_argument("host", type=str, help="URI at which Hostea is running") + + def register(self): + def run(args, c: Session): + dash = dash_from_args(args, c=c) + dash.register(maildev_host=args.maildev_host) + + self.register_parser = self.subparser.add_parser( + name="register", + description="register new user ", + help="Register new user on Hostea Dashboard", + ) + self.__add_credentials_parser(self.register_parser) + self.register_parser.add_argument( + "maildev_host", type=str, help="URI at which maildev is running" + ) + self.register_parser.set_defaults(func=run) + + def login(self): + def run(args, c: Session): + dash = dash_from_args(args, c=c) + dash.login() + + self.login_parser = self.subparser.add_parser( + name="login", + description="login", + help="Login user on Hostea Dashboard", + ) + self.__add_credentials_parser(self.login_parser) + self.login_parser.set_defaults(func=run) + + def support(self): + def run(args, c: Session): + from .gitea import GiteaSSO + + dash = dash_from_args(args, c=c) + dash.login() + + gitea = GiteaSSO( + username=dash.username, + email=dash.email, + gitea_host=args.gitea_host, + hostea_org=args.gitea_hostea_org, + support_repo=args.support_repo, + c=c, + ) + dash.new_ticket(gitea.new_issues_uri) + gitea.new_issue() + + self.support_parser = self.subparser.add_parser( + name="support", + description="Hostea support", + help="Support functionality on Hostea Dashboard", + ) + self.__add_credentials_parser(self.support_parser) + self.support_parser.add_argument( + "gitea_host", type=str, help="URI at which Gitea is running" + ) + self.support_parser.add_argument( + "gitea_hostea_org", + type=str, + help="Hostea namespace(username/org) on Gitea, where support repository is hosted", + ) + self.support_parser.add_argument( + "support_repo", type=str, help="support repository name" + ) + + self.support_parser.set_defaults(func=run) + + +class Cli: + def __init__(self): + c = Session() + self.c = c + self.parser = argparse.ArgumentParser( + description="Install and Bootstrap Gitea and Hostea Dashboard" + ) + self.subparser = self.parser.add_subparsers() + self.check_env() + self.gitea() + self.hostea() + + def __add_credentials_parser(self, parser): + group = parser.add_argument_group("credentials", "User credentials") + group.add_argument("username", type=str, help="Gitea user's username") + group.add_argument("password", type=str, help="Gitea user's password") + group.add_argument("email", type=str, help="Gitea user's email") + + def check_env(self): + def run(args, c: Session): + from .gitea import Gitea + from .hostea import Hostea + + Hostea.check_online( + dashboard_host=args.hostea_host, maildev_host=args.maildev_host + ) + Gitea.check_online(host=args.gitea_host) + + self.check_env_parser = self.subparser.add_parser( + name="check_env", + description="Check and block until environment is ready", + help="Check and block until environment is ready", + ) + + self.check_env_parser.add_argument( + "gitea_host", type=str, help="URI at which Gitea is running" + ) + + self.check_env_parser.add_argument( + "hostea_host", type=str, help="URI at which Hostea is running" + ) + + self.check_env_parser.add_argument( + "maildev_host", type=str, help="URI at which maildev is running" + ) + self.check_env_parser.set_defaults(func=run) + + def hostea(self): + self.hostea = self.subparser.add_parser( + name="hostea", + description="Hostea Dashboard", + help="Hostea Dashboard-related functionality", + ) + Hostea(parser=self.hostea, c=self.c) + + def gitea(self): + self.gitea = self.subparser.add_parser( + name="gitea", + description="Gitea", + help="Gitea-related functionality", + ) + Gitea(parser=self.gitea, c=self.c) + + def parse(self): + return self.parser.parse_args() diff --git a/integration/csrf.py b/integration/csrf.py new file mode 100644 index 0000000..bee85ca --- /dev/null +++ b/integration/csrf.py @@ -0,0 +1,38 @@ +from html.parser import HTMLParser + + +class ParseCSRF(HTMLParser): + token: str = None + + def __init__(self, name): + HTMLParser.__init__(self) + self.name = name + + # @classmethod + # def dashboard_parser(cls) -> "ParseCSRF": + # return cls(name="csrfmiddlewaretoken") + # + # @classmethod + # def gitea_parser(cls) -> "ParseCSRF": + # return cls(name="_csrf") + # + def handle_starttag(self, tag: str, attrs: (str, str)): + if self.token: + return + + if tag != "input": + return + + token = None + for (index, (k, v)) in enumerate(attrs): + if k == "value": + token = v + + if all([k == "name", v == self.name]): + if token: + self.token = token + return + for (inner_index, (nk, nv)) in enumerate(attrs, start=index): + if nk == "value": + self.token = nv + return diff --git a/integration/gitea.py b/integration/gitea.py new file mode 100755 index 0000000..937236f --- /dev/null +++ b/integration/gitea.py @@ -0,0 +1,449 @@ +from urllib.parse import urlunparse, urlparse +from html.parser import HTMLParser +from time import sleep +import random +import argparse + +from requests import Session +from requests.auth import HTTPBasicAuth +import requests + +from .csrf import ParseCSRF + +# GITEA_USER = "root" +# GITEA_EMAIL = "root@example.com" +# GITEA_PASSWORD = "foobarpassword" +# HOST = "http://localhost:8080" +# +# REPOS = [] + + +class Gitea: + def __init__(self, host: str, username: str, password: str, email: str, c: Session): + self.host = host + self.username = username + self.password = password + self.email = email + self.c = c + self.__csrf_key = "_csrf" + self.__logged_in = False + + def get_uri(self, path: str): + parsed = urlparse(self.host) + return urlunparse((parsed.scheme, parsed.netloc, path, "", "", "")) + + def get_api_uri(self, path: str): + parsed = urlparse(self.host) + return urlunparse( + ( + parsed.scheme, + f"{self.username}:{self.password}@{parsed.netloc}", + path, + "", + "", + "", + ) + ) + + @staticmethod + def check_online(host: str): + """ + Check if Gitea instance is online + """ + count = 0 + parsed = urlparse(host) + url = urlunparse((parsed.scheme, parsed.netloc, "api/v1/nodeinfo", "", "", "")) + + while True: + try: + res = requests.get(url, allow_redirects=False) + if any([res.status_code == 302, res.status_code == 200]): + break + except: + sleep(2) + print(f"Retrying {count} time") + count += 1 + continue + + def install(self): + """ + Install Gitea, first form that a user sees when a new instance is + deployed + """ + payload = { + "db_type": "sqlite3", + "db_host": "localhost:3306", + "db_user": "root", + "db_passwd": "", + "db_name": "gitea", + "ssl_mode": "disable", + "db_schema": "", + "charset": "utf8", + "db_path": "/data/gitea/gitea.db", + "app_name": "Gitea:+Git+with+a+cup+of+tea", + "repo_root_path": "/data/git/repositories", + "lfs_root_path": "/data/git/lfs", + "run_user": "git", + "domain": "localhost", + "ssh_port": "2221", + "http_port": "3000", + "app_url": self.get_uri(""), + "log_root_path": "/data/gitea/log", + "smtp_host": "", + "smtp_from": "", + "smtp_user": "", + "smtp_passwd": "", + "enable_federated_avatar": "on", + "enable_open_id_sign_in": "on", + "enable_open_id_sign_up": "on", + "default_allow_create_organization": "on", + "default_enable_timetracking": "on", + "no_reply_address": "noreply.localhost", + "password_algorithm": "pbkdf2", + "admin_name": "", + "admin_passwd": "", + "admin_confirm_passwd": "", + "admin_email": "", + } + self.c.post(self.get_uri(""), data=payload) + sleep(10) + + def get_csrf_token(self, url: str) -> str: + """ + Get CSRF token at a URI + """ + resp = self.c.get(url, allow_redirects=False) + if resp.status_code != 200 and resp.status_code != 302: + print(resp.status_code, resp.text) + raise Exception(f"Can't get csrf token: {resp.status_code}") + parser = ParseCSRF(name=self.__csrf_key) + parser.feed(resp.text) + csrf = parser.token + return csrf + + def register(self): + """ + Register User + """ + url = self.get_uri("/user/sign_up") + csrf = self.get_csrf_token(url) + payload = { + "_csrf": csrf, + "user_name": self.username, + "password": self.password, + "retype": self.password, + "email": self.email, + } + self.c.post(url, data=payload, allow_redirects=False) + + def login(self): + """ + Login, must be called at least once before performing authenticated + operations + """ + if self.__logged_in: + return + url = self.get_uri("/user/login") + csrf = self.get_csrf_token(url) + payload = { + "_csrf": csrf, + "user_name": self.username, + "password": self.password, + "remember": "on", + } + resp = self.c.post(url, data=payload, allow_redirects=False) + if any( + [resp.status_code == 302, resp.status_code == 200, resp.status_code == 303] + ): + print("User logged in") + self.__logged_in = True + return + + raise Exception( + f"[ERROR] Authentication failed. status code {resp.status_code}" + ) + + def create_repository(self, name: str): + """ + Create repository + """ + self.login() + + def get_repository_payload(csrf: str, name: str, user_id: str): + data = { + "_csrf": csrf, + "uid": user_id, + "repo_name": name, + "description": f"this repository is named {name}", + "repo_template": "", + "issue_labels": "", + "gitignores": "", + "license": "", + "readme": "Default", + "default_branch": "master", + "trust_model": "default", + } + return data + + url = self.get_uri("/repo/create") + user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"] + print(type(user_id)) + print(user_id) + + csrf = self.get_csrf_token(url) + data = get_repository_payload(csrf, name, user_id=user_id) + print(data) + print(type(data)) + + resp = self.c.post(url, data=data, allow_redirects=False) + print(f"Created repository {name}") + if resp.status_code != 302 and resp.status_code != 200: + raise Exception( + f"Error while creating repository: {name} {resp.status_code}" + ) + + def install_sso( + self, + sso_name: str, + client_id: str, + client_secret: str, + sso_auto_discovery_url: str, + ): + self.login() + """ + Install SSO. + + - sso_name: human readable SSO name. Doesn't have any functionality + except assisting admins ID an SSO by a name + + - client_id: OAuth stuff, will be generated by the SSO that should be integrated + + - client_secret: OAuth stuff, will be generated by the SSO that should be integrated + + - sso_auto_discovery_url: Again OAuth stuff. Should be available in the SSO documentation. + In Hostea SSO's case, it is available at + https://hostea-dash.example.org/o/.well-known/openid-configuration/ + """ + csrf = self.get_csrf_token(self.get_uri("/admin/auths/new")) + payload = { + "_autofill_dummy_username": "", + "_autofill_dummy_password": "", + "_csrf": csrf, + "type": "6", + "name": sso_name, + "security_protocol": "", + "host": "", + "port": "", + "bind_dn": "", + "bind_password": "", + "user_base": "", + "user_dn": "", + "filter": "", + "admin_filter": ["", ""], + "attribute_username": "", + "attribute_name": "", + "attribute_surname": "", + "attribute_mail": "", + "attribute_ssh_public_key": "", + "attribute_avatar": "", + "group_dn": "", + "group_member_uid": "", + "user_uid": "", + "group_filter": "", + "group_team_map": "", + "search_page_size": "", + "smtp_auth": "PLAIN", + "smtp_host": "", + "smtp_port": "", + "helo_hostname": "", + "allowed_domains": "", + "pam_service_name": "", + "pam_email_domain": "", + "oauth2_provider": "openidConnect", + "oauth2_key": client_id, + "oauth2_secret": client_secret, + "oauth2_icon_url": "", + "open_id_connect_auto_discovery_url": sso_auto_discovery_url, + "oauth2_auth_url": "", + "oauth2_token_url": "", + "oauth2_profile_url": "", + "oauth2_email_url": "", + "oauth2_tenant": "", + "oauth2_scopes": "openid", + "oauth2_required_claim_name": "", + "oauth2_required_claim_value": "", + "oauth2_group_claim_name": "", + "oauth2_admin_group": "", + "oauth2_restricted_group": "", + "sspi_auto_create_users": "on", + "sspi_auto_activate_users": "on", + "sspi_strip_domain_names": "on", + "sspi_separator_replacement": "_", + "sspi_default_language": "", + "is_sync_enabled": "on", + "is_active": "on", + } + + resp = self.c.post(self.get_uri("/admin/auths/new"), data=payload) + + +def cli(): + parser = argparse.ArguementParser(description="Install and Bootstrap Gitea") + parser.add_arguement("username", type=str, help="Gitea user's username") + parser.add_arguement("password", type=str, help="Gitea user's password") + parser.add_arguement("email", type=str, help="Gitea user's email") + args = parser.parse_args() + + +if __name__ == "__main__": + print("hello") + + # args = cli() + + +# gitea = Gitea( +# host=HOST, +# username=GITEA_USER, +# password=GITEA_PASSWORD, +# email=GITEA_EMAIL, +# c=Session(), +# ) +# gitea.check_online() +# print("Instace online") +# gitea.install() +# gitea.register() +# gitea.login() +# gitea.create_repository(name="support") +# client_id = "" +# client_secret = "" +# sso_auto_discovery_url = "" +# gitea.install_sso( +# sso_name="Hostea OIDC", +# client_id=client_id, +# client_secret=client_secret, +# sso_auto_discovery_url=sso_auto_discovery_url, +# ) + + +class ParseSSOLogin(HTMLParser): + url: str = None + + def handle_starttag(self, tag: str, attrs: (str, str)): + if self.url: + return + + if tag != "a": + return + + token = None + for (index, (k, v)) in enumerate(attrs): + if k == "href": + if "/user/oauth2/" in v: + self.url = v + return + + +class GiteaSSO: + def __init__( + self, + username: str, + email: str, + gitea_host: str, + hostea_org: str, + support_repo: str, + c: Session, + ): + self.c = c + self.username = username + self.gitea_host = gitea_host + self.hostea_org = hostea_org + self.support_repo = support_repo + self.email = email + + self.__csrf_key = "_csrf" + + url = urlparse(self.gitea_host) + repo = f"{self.hostea_org}/{self.support_repo}" + issues = f"{repo}/issues" + new_issues = f"{issues}/new" + + self.__partial_call_back_url = urlunparse( + (url.scheme, url.netloc, "/user/oauth2/", "", "", "") + ) + self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", "")) + self.__link_acount = urlunparse( + (url.scheme, url.netloc, "/user/link_account/", "", "", "") + ) + self.__link_acount_signup = urlunparse( + (url.scheme, url.netloc, "/user/link_account_signup/", "", "", "") + ) + + self.__me = urlunparse( + (url.scheme, url.netloc, f"/{self.username}", "", "", "") + ) + + self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", "")) + self.new_issues_uri = urlunparse( + (url.scheme, url.netloc, new_issues, "", "", "") + ) + + def get_csrf(self, url: str) -> str: + resp = self.c.get(url) + parser = ParseCSRF(name=self.__csrf_key) + parser.feed(resp.text) + return parser.token + + def _sso_login(self): + resp = self.c.get(self.__login) + parser = ParseSSOLogin() + parser.feed(resp.text) + + url = urlparse(self.gitea_host) + ## SSO URL in Gitea login page + sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", "")) + + # redirects are enabled to for a cleaner implementation. Commented out + # code below does the same in a step-by-step manner + resp = self.c.get(sso) + + # resp = c.get(sso, allow_redirects=False) + # ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization + # assert resp.status_code == 307 + + # resp = c.get(resp.headers["Location"], allow_redirects=False) + + # assert self.__partial_call_back_url in resp.headers["Location"] + # resp = c.get(resp.headers["Location"], allow_redirects=False) + # assert resp.status_code == 303 + # assert resp.headers["Location"] in self.__link_acount + + # to register account, the user has to visit form at self.__link_acount + csrf = self.get_csrf(self.__link_acount) + # which makes a POST request to self.__link_acount_signup + # weird, but have to go to above URL to collect CSRF toekn + payload = { + "user_name": self.username, + "email": self.email, + self.__csrf_key: csrf, + } + + resp = self.c.post(self.__link_acount_signup, payload, allow_redirects=True) + assert resp.status_code == 200 + # redirect mechanisms seems to change for every version + # print(resp.status_code) + # print(resp.headers["Location"]) + # assert resp.status_code == 303 + # assert resp.headers["Location"] == self.new_issues_uri + + resp = self.c.get(self.__me) + assert resp.status_code == 200 + assert self.username in resp.text + + def new_issue(self): + resp = self.c.get(self.new_issues_uri, allow_redirects=False) + resp.status_code = 303 + assert "/user/login" in resp.headers["Location"] + + self._sso_login() + resp = self.c.get(self.new_issues_uri, allow_redirects=False) + assert resp.status_code == 200 diff --git a/integration/hostea.py b/integration/hostea.py new file mode 100644 index 0000000..b8115aa --- /dev/null +++ b/integration/hostea.py @@ -0,0 +1,129 @@ +import logging +from urllib.parse import urlparse, urlunparse +from html.parser import HTMLParser +from time import sleep + +from requests import Session +import requests + +from .csrf import ParseCSRF + + +class Hostea: + def __init__(self, username: str, email: str, password: str, host: str, c: Session): + self.username = username + self.email = email + self.password = password + self.csrf_key = "csrfmiddlewaretoken" + self.host = host + self.c = c + + @staticmethod + def check_online(dashboard_host: str, maildev_host: str): + """ + Check if Hostea Dashboard is online + """ + count = 0 + dash_parsed = urlparse(dashboard_host) + maildev_parsed = urlparse(maildev_host) + urls = [ + urlunparse((dash_parsed.scheme, dash_parsed.netloc, "/login/", "", "", "")), + urlunparse((maildev_parsed.scheme, maildev_parsed.netloc, "", "", "", "")), + ] + + for url in urls: + while True: + try: + res = requests.get(url, allow_redirects=False) + if any([res.status_code == 302, res.status_code == 200]): + break + except Exception as e: + sleep(2) + print(e) + print(f"[Hostea] Retrying {count} time for {url}") + count += 1 + continue + + def get_uri(self, path: str): + parsed = urlparse(self.host) + return urlunparse((parsed.scheme, parsed.netloc, path, "", "", "")) + + def get_csrf(self, url: str) -> str: + resp = self.c.get(url=url) + assert resp.status_code == 200 + parser = ParseCSRF(name=self.csrf_key) + parser.feed(resp.text) + csrf = parser.token + return csrf + + def __get_verification_link(self, maildev_host: str): + def maildev_uri(maildev_host: str, path: str): + parsed = urlparse(maildev_host) + return urlunparse((parsed.scheme, parsed.netloc, path, "", "", "")) + + resp = self.c.get(maildev_uri(maildev_host=maildev_host, path="/email/")) + # resp = self.c.get("http://localhost:1080/email/") + emails = resp.json() + for email in emails: + if email["to"][0]["address"] == self.email: + logging.info("[Dashboard] Found verification link") + resp = self.c.delete( + maildev_uri(maildev_host=maildev_host, path=f"/email/{email['id']}") + ) + return str.strip(email["text"].split("\n")[1]) + logging.critical("[Dashboard] Verification link not found") + + def register(self, maildev_host: str): + url = self.get_uri("/register/") + csrf = self.get_csrf(url) + payload = { + "username": self.username, + "password": self.password, + "email": self.email, + "confirm_password": self.password, + self.csrf_key: csrf, + } + + logging.info("Registering user") + resp = self.c.post(url, payload, allow_redirects=False) + assert resp.status_code == 302 + assert "pending" in resp.headers["Location"] + + email_verification_link = self.__get_verification_link( + maildev_host=maildev_host + ) + csrf = self.get_csrf(email_verification_link) + payload = { + self.csrf_key: csrf, + } + resp = self.c.post(email_verification_link, payload, allow_redirects=False) + assert resp.status_code == 302 + assert resp.headers["Location"] == "/login/" + logging.info("[Dashboard] Email verified user") + + def login(self): + url = self.get_uri("/login/") + + csrf = self.get_csrf(url) + payload = { + "login": self.username, + "password": self.password, + self.csrf_key: csrf, + } + + logging.info("Logging In user") + resp = self.c.post(url, payload, allow_redirects=False) + + assert resp.status_code == 302 + assert resp.headers["Location"] == "/" + + resp = self.c.get(self.get_uri("/support/new/")) + assert resp.status_code == 200 + + def new_ticket(self, support_repository_new_issue: str): + resp = self.c.get(self.get_uri("/support/new/")) + + +# print(resp.text) +# print(support_repository_new_issue) +# assert support_repository_new_issue in resp.text diff --git a/integration/tests.py b/integration/tests.py deleted file mode 100644 index 7613446..0000000 --- a/integration/tests.py +++ /dev/null @@ -1,276 +0,0 @@ -import logging -from urllib.parse import urlparse, urlunparse -from html.parser import HTMLParser - -from requests import Session - -logging.basicConfig(level=logging.INFO) - - -c = Session() - - -class ParseCSRF(HTMLParser): - token: str = None - - def __init__(self, name): - HTMLParser.__init__(self) - self.name = name - - @classmethod - def dashboard_parser(cls) -> "ParseCSRF": - return cls(name="csrfmiddlewaretoken") - - @classmethod - def gitea_parser(cls) -> "ParseCSRF": - return cls(name="_csrf") - - def handle_starttag(self, tag: str, attrs: (str, str)): - if self.token: - return - - if tag != "input": - return - - token = None - for (index, (k, v)) in enumerate(attrs): - if k == "value": - token = v - - if all([k == "name", v == self.name]): - if token: - self.token = token - return - for (inner_index, (nk, nv)) in enumerate(attrs, start=index): - if nk == "value": - self.token = nv - return - - -class ParseSSOLogin(HTMLParser): - url: str = None - - def handle_starttag(self, tag: str, attrs: (str, str)): - if self.url: - return - - if tag != "a": - return - - token = None - for (index, (k, v)) in enumerate(attrs): - if k == "href": - if "/user/oauth2/" in v: - self.url = v - return - - -class Hostea: - def __init__(self, username: str, email: str, password: str, url: str): - self.username = username - self.email = email - self.password = password - self.url = urlparse(url) - self.csrf_key = "csrfmiddlewaretoken" - - def get_csrf(self, url: str) -> str: - resp = c.get(url) - assert resp.status_code == 200 - parser = ParseCSRF(name=self.csrf_key) - parser.feed(resp.text) - csrf = parser.token - return csrf - - def __get_verification_link(self): - resp = c.get("http://localhost:1080/email/") - emails = resp.json() - for email in emails: - if email["to"][0]["address"] == self.email: - logging.info("[Dashboard] Found verification link") - resp = c.delete(f"http://localhost:1080/email/{email['id']}") - return str.strip(email["text"].split("\n")[1]) - logging.critical("[Dashboard] Verification link not found") - - def register(self): - url = urlunparse((self.url.scheme, self.url.netloc, "/register/", "", "", "")) - csrf = self.get_csrf(url) - payload = { - "username": self.username, - "password": self.password, - "email": self.email, - "confirm_password": self.password, - self.csrf_key: csrf, - } - - logging.info("Registering user") - resp = c.post(url, payload, allow_redirects=False) - assert resp.status_code == 302 - assert "pending" in resp.headers["Location"] - - email_verification_link = self.__get_verification_link() - csrf = self.get_csrf(email_verification_link) - payload = { - self.csrf_key: csrf, - } - resp = c.post(email_verification_link, payload, allow_redirects=False) - assert resp.status_code == 302 - assert resp.headers["Location"] == "/login/" - logging.info("[Dashboard] Email verified user") - - def login(self): - url = urlunparse((self.url.scheme, self.url.netloc, "/login/", "", "", "")) - - csrf = self.get_csrf(url) - payload = { - "login": self.username, - "password": self.password, - self.csrf_key: csrf, - } - - logging.info("Logging In user") - resp = c.post(url, payload, allow_redirects=False) - - assert resp.status_code == 302 - assert resp.headers["Location"] == "/" - - url = urlunparse( - (self.url.scheme, self.url.netloc, "/support/new/", "", "", "") - ) - - resp = c.get(url) - assert resp.status_code == 200 - - def new_ticket(self, support_repository_new_issue: str): - url = urlunparse( - (self.url.scheme, self.url.netloc, "/support/new/", "", "", "") - ) - resp = c.get(url) - assert support_repository_new_issue in resp.text - - -class Gitea: - def __init__( - self, - username: str, - email: str, - gitea_host: str, - hostea_org: str, - support_repo: str, - ): - self.username = username - self.gitea_host = gitea_host - self.hostea_org = hostea_org - self.support_repo = support_repo - self.email = email - - self.__csrf_key = "_csrf" - - url = urlparse(self.gitea_host) - repo = f"{self.hostea_org}/{self.support_repo}" - issues = f"{repo}/issues" - new_issues = f"{issues}/new" - - self.__partial_call_back_url = urlunparse( - (url.scheme, url.netloc, "/user/oauth2/", "", "", "") - ) - self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", "")) - self.__link_acount = urlunparse( - (url.scheme, url.netloc, "/user/link_account/", "", "", "") - ) - self.__link_acount_signup = urlunparse( - (url.scheme, url.netloc, "/user/link_account_signup/", "", "", "") - ) - - self.__me = urlunparse( - (url.scheme, url.netloc, f"/{self.username}", "", "", "") - ) - - self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", "")) - self.new_issues_uri = urlunparse( - (url.scheme, url.netloc, new_issues, "", "", "") - ) - - def get_csrf(self, url: str) -> str: - resp = c.get(url) - parser = ParseCSRF.gitea_parser() - parser.feed(resp.text) - return parser.token - - def _sso_login(self): - resp = c.get(self.__login) - parser = ParseSSOLogin() - parser.feed(resp.text) - - url = urlparse(self.gitea_host) - ## SSO URL in Gitea login page - sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", "")) - - # redirects are enabled to for a cleaner implementation. Commented out - # code below does the same in a step-by-step manner - resp = c.get(sso) - - # resp = c.get(sso, allow_redirects=False) - # ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization - # assert resp.status_code == 307 - - # resp = c.get(resp.headers["Location"], allow_redirects=False) - - # assert self.__partial_call_back_url in resp.headers["Location"] - # resp = c.get(resp.headers["Location"], allow_redirects=False) - # assert resp.status_code == 303 - # assert resp.headers["Location"] in self.__link_acount - - # to register account, the user has to visit form at self.__link_acount - csrf = self.get_csrf(self.__link_acount) - # which makes a POST request to self.__link_acount_signup - # weird, but have to go to above URL to collect CSRF toekn - payload = { - "user_name": self.username, - "email": self.email, - self.__csrf_key: csrf, - } - - resp = c.post(self.__link_acount_signup, payload, allow_redirects=False) - assert resp.status_code == 303 - assert resp.headers["Location"] == "/Hostea/support/issues/new" - - resp = c.get(self.__me) - assert resp.status_code == 200 - assert self.username in resp.text - - def new_issue(self): - resp = c.get(self.new_issues_uri, allow_redirects=False) - resp.status_code = 303 - assert "/user/login" in resp.headers["Location"] - - self._sso_login() - resp = c.get(self.new_issues_uri, allow_redirects=False) - assert resp.status_code == 200 - - -def main(): - - dash = Hostea( - username="enough", - email="enough@example.org", - password="asdfas234234vaa", - url="http://localhost:8000", - ) - gitea = Gitea( - gitea_host="http://localhost:8080", - username=dash.username, - email=dash.email, - hostea_org="Hostea", - support_repo="support", - ) - dash.register() - dash.login() - dash.new_ticket(gitea.new_issues_uri) - - gitea.new_issue() - - logging.info("All tests passed") - - -if __name__ == "__main__": - main() diff --git a/integration/tests.sh b/integration/tests.sh new file mode 100755 index 0000000..1527648 --- /dev/null +++ b/integration/tests.sh @@ -0,0 +1,149 @@ +#!/bin/bash + +set -Exeuo pipefail + +readonly DASHBOARD_URL="http://localhost:8000" +readonly GITEA_URL="http://localhost:3000" +readonly MAILDEV_URL="http://localhost:1080" +readonly DASHBOARD_OIDC_DISCOVERY_URL="$DASHBOARD_URL/o/.well-known/openid-configuration/" + +readonly DASHBOARD_ADMIN_USERNAME=root +readonly DASHBOARD_ADMIN_PASSWORD=supercomplicatedpassword +readonly DASHBOARD_ADMIN_EMAIL="$DASHBOARD_ADMIN_USERNAME@dash.example.org" +readonly DASHBOARD_OIDC_APP_NAME=hostea-gitea + +readonly GITEA_ROOT_USERNAME=root +readonly GITEA_ROOT_EMAIL="$GITEA_ROOT_USERNAME@example.org" +readonly GITEA_ROOT_PASSOWRD=supercomplicatedpassword +readonly GITEA_HOSTEA_SSO_NAME=hostea-sso +readonly GITEA_OIDC_CALLBACK="$GITEA_URL/user/oauth2/$GITEA_HOSTEA_SSO_NAME/callback" + +readonly GITEA_HOSTEA_USERNAME=hostea +readonly GITEA_HOSTEA_PASSWORD=supercomplicatedpassword +readonly GITEA_HOSTEA_EMAIL="$GITEA_HOSTEA_USERNAME@example.org" +readonly GITEA_HOSTEA_SUPPORT_REPO="support" + +readonly HOSTEA_CUSTOMER_USERNAME=batman +readonly HOSTEA_CUSTOMER_PASSWORD=supercomplicatedpassword +readonly HOSTEA_CUSTOMER_EMAIL="$HOSTEA_CUSTOMER_USERNAME@example.org" + +OIDC_CLIENT_ID="" +OIDC_CLIENT_SECRET="" + +wait_for_env() { + python -m integration \ + check_env $GITEA_URL $DASHBOARD_URL $MAILDEV_URL +} + +# create OIDC app on Hostea Dashboard +oidc_dashboard_init() { + python -m integration \ + hostea register \ + $DASHBOARD_ADMIN_USERNAME $DASHBOARD_ADMIN_PASSWORD \ + $DASHBOARD_ADMIN_EMAIL \ + $DASHBOARD_URL \ + $MAILDEV_URL + + resp=$(python manage.py create_oidc \ + $DASHBOARD_OIDC_APP_NAME $DASHBOARD_ADMIN_USERNAME \ + $GITEA_OIDC_CALLBACK) + OIDC_CLIENT_ID=$(echo $resp | cut -d ":" -f 2 | cut -d " " -f 2) + OIDC_CLIENT_SECRET=$(echo $resp | cut -d ":" -f 3 | cut -d " " -f 2) +} + +# register root user on Gitea to simulate Hoste admin and integrate SSO +gitea_root(){ + python -m integration \ + gitea install \ + $GITEA_ROOT_USERNAME $GITEA_ROOT_PASSOWRD \ + $GITEA_ROOT_EMAIL \ + $GITEA_URL + python -m integration \ + gitea register \ + $GITEA_ROOT_USERNAME $GITEA_ROOT_PASSOWRD \ + $GITEA_ROOT_EMAIL \ + $GITEA_URL + python -m integration \ + gitea login \ + $GITEA_ROOT_USERNAME $GITEA_ROOT_PASSOWRD \ + $GITEA_ROOT_EMAIL \ + $GITEA_URL + python -m integration \ + gitea install_sso \ + $GITEA_ROOT_USERNAME $GITEA_ROOT_PASSOWRD \ + $GITEA_ROOT_EMAIL \ + $GITEA_URL \ + $GITEA_HOSTEA_SSO_NAME \ + $OIDC_CLIENT_ID $OIDC_CLIENT_SECRET \ + $DASHBOARD_OIDC_DISCOVERY_URL +} + + +# register user "Hostea" on Gitea and create support repository +support_repo_init() { + python -m integration \ + gitea register \ + $GITEA_HOSTEA_USERNAME $GITEA_HOSTEA_PASSWORD \ + $GITEA_HOSTEA_EMAIL \ + $GITEA_URL + python -m integration \ + gitea login \ + $GITEA_HOSTEA_USERNAME $GITEA_HOSTEA_PASSWORD \ + $GITEA_HOSTEA_EMAIL \ + $GITEA_URL + python -m integration \ + gitea create_repo \ + $GITEA_HOSTEA_USERNAME $GITEA_HOSTEA_PASSWORD \ + $GITEA_HOSTEA_EMAIL \ + $GITEA_URL \ + $GITEA_HOSTEA_SUPPORT_REPO +} + +# Create user on Hostea to simulate a Hostea customer +hostea_customer_simulation() { + python -m integration \ + hostea register \ + $HOSTEA_CUSTOMER_USERNAME $HOSTEA_CUSTOMER_PASSWORD \ + $HOSTEA_CUSTOMER_EMAIL \ + $DASHBOARD_URL \ + $MAILDEV_URL + python -m integration \ + hostea login \ + $HOSTEA_CUSTOMER_USERNAME $HOSTEA_CUSTOMER_PASSWORD \ + $HOSTEA_CUSTOMER_EMAIL $DASHBOARD_URL + python -m integration \ + hostea support \ + $HOSTEA_CUSTOMER_USERNAME $HOSTEA_CUSTOMER_PASSWORD \ + $HOSTEA_CUSTOMER_EMAIL \ + $DASHBOARD_URL \ + $GITEA_URL \ + $GITEA_HOSTEA_USERNAME $GITEA_HOSTEA_SUPPORT_REPO +} + +SERVER_PID="" + +setup_env() { + nohup python manage.py runserver > /dev/null 2>&1 & + SERVER_PID=$! + docker-compose -f docker-compose-dev-deps.yml up --detach +} + +teardown_env() { + docker-compose -f docker-compose-dev-deps.yml down --remove-orphans + kill $SERVER_PID +} + +main() { + teardown_env || true + setup_env + wait_for_env + oidc_dashboard_init + echo "OIDC APP initialized. CLIENT_ID: $OIDC_CLIENT_ID CLIENT SECRET: $OIDC_CLIENT_SECRET" + gitea_root + support_repo_init + hostea_customer_simulation + teardown_env + echo "All Good! :)" +} + +main