diff --git a/Makefile b/Makefile index e44ab37..1dcabd5 100644 --- a/Makefile +++ b/Makefile @@ -39,6 +39,7 @@ lint: ## Run linter @./venv/bin/black ./support/ @./venv/bin/black ./billing/ @./venv/bin/black ./infrastructure/ + @./venv/bin/black ./integration/ migrate: ## Run migrations $(call run_migrations) diff --git a/integration/tests.py b/integration/tests.py new file mode 100644 index 0000000..7613446 --- /dev/null +++ b/integration/tests.py @@ -0,0 +1,276 @@ +import logging +from urllib.parse import urlparse, urlunparse +from html.parser import HTMLParser + +from requests import Session + +logging.basicConfig(level=logging.INFO) + + +c = Session() + + +class ParseCSRF(HTMLParser): + token: str = None + + def __init__(self, name): + HTMLParser.__init__(self) + self.name = name + + @classmethod + def dashboard_parser(cls) -> "ParseCSRF": + return cls(name="csrfmiddlewaretoken") + + @classmethod + def gitea_parser(cls) -> "ParseCSRF": + return cls(name="_csrf") + + def handle_starttag(self, tag: str, attrs: (str, str)): + if self.token: + return + + if tag != "input": + return + + token = None + for (index, (k, v)) in enumerate(attrs): + if k == "value": + token = v + + if all([k == "name", v == self.name]): + if token: + self.token = token + return + for (inner_index, (nk, nv)) in enumerate(attrs, start=index): + if nk == "value": + self.token = nv + return + + +class ParseSSOLogin(HTMLParser): + url: str = None + + def handle_starttag(self, tag: str, attrs: (str, str)): + if self.url: + return + + if tag != "a": + return + + token = None + for (index, (k, v)) in enumerate(attrs): + if k == "href": + if "/user/oauth2/" in v: + self.url = v + return + + +class Hostea: + def __init__(self, username: str, email: str, password: str, url: str): + self.username = username + self.email = email + self.password = password + self.url = urlparse(url) + self.csrf_key = "csrfmiddlewaretoken" + + def get_csrf(self, url: str) -> str: + resp = c.get(url) + assert resp.status_code == 200 + parser = ParseCSRF(name=self.csrf_key) + parser.feed(resp.text) + csrf = parser.token + return csrf + + def __get_verification_link(self): + resp = c.get("http://localhost:1080/email/") + emails = resp.json() + for email in emails: + if email["to"][0]["address"] == self.email: + logging.info("[Dashboard] Found verification link") + resp = c.delete(f"http://localhost:1080/email/{email['id']}") + return str.strip(email["text"].split("\n")[1]) + logging.critical("[Dashboard] Verification link not found") + + def register(self): + url = urlunparse((self.url.scheme, self.url.netloc, "/register/", "", "", "")) + csrf = self.get_csrf(url) + payload = { + "username": self.username, + "password": self.password, + "email": self.email, + "confirm_password": self.password, + self.csrf_key: csrf, + } + + logging.info("Registering user") + resp = c.post(url, payload, allow_redirects=False) + assert resp.status_code == 302 + assert "pending" in resp.headers["Location"] + + email_verification_link = self.__get_verification_link() + csrf = self.get_csrf(email_verification_link) + payload = { + self.csrf_key: csrf, + } + resp = c.post(email_verification_link, payload, allow_redirects=False) + assert resp.status_code == 302 + assert resp.headers["Location"] == "/login/" + logging.info("[Dashboard] Email verified user") + + def login(self): + url = urlunparse((self.url.scheme, self.url.netloc, "/login/", "", "", "")) + + csrf = self.get_csrf(url) + payload = { + "login": self.username, + "password": self.password, + self.csrf_key: csrf, + } + + logging.info("Logging In user") + resp = c.post(url, payload, allow_redirects=False) + + assert resp.status_code == 302 + assert resp.headers["Location"] == "/" + + url = urlunparse( + (self.url.scheme, self.url.netloc, "/support/new/", "", "", "") + ) + + resp = c.get(url) + assert resp.status_code == 200 + + def new_ticket(self, support_repository_new_issue: str): + url = urlunparse( + (self.url.scheme, self.url.netloc, "/support/new/", "", "", "") + ) + resp = c.get(url) + assert support_repository_new_issue in resp.text + + +class Gitea: + def __init__( + self, + username: str, + email: str, + gitea_host: str, + hostea_org: str, + support_repo: str, + ): + self.username = username + self.gitea_host = gitea_host + self.hostea_org = hostea_org + self.support_repo = support_repo + self.email = email + + self.__csrf_key = "_csrf" + + url = urlparse(self.gitea_host) + repo = f"{self.hostea_org}/{self.support_repo}" + issues = f"{repo}/issues" + new_issues = f"{issues}/new" + + self.__partial_call_back_url = urlunparse( + (url.scheme, url.netloc, "/user/oauth2/", "", "", "") + ) + self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", "")) + self.__link_acount = urlunparse( + (url.scheme, url.netloc, "/user/link_account/", "", "", "") + ) + self.__link_acount_signup = urlunparse( + (url.scheme, url.netloc, "/user/link_account_signup/", "", "", "") + ) + + self.__me = urlunparse( + (url.scheme, url.netloc, f"/{self.username}", "", "", "") + ) + + self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", "")) + self.new_issues_uri = urlunparse( + (url.scheme, url.netloc, new_issues, "", "", "") + ) + + def get_csrf(self, url: str) -> str: + resp = c.get(url) + parser = ParseCSRF.gitea_parser() + parser.feed(resp.text) + return parser.token + + def _sso_login(self): + resp = c.get(self.__login) + parser = ParseSSOLogin() + parser.feed(resp.text) + + url = urlparse(self.gitea_host) + ## SSO URL in Gitea login page + sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", "")) + + # redirects are enabled to for a cleaner implementation. Commented out + # code below does the same in a step-by-step manner + resp = c.get(sso) + + # resp = c.get(sso, allow_redirects=False) + # ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization + # assert resp.status_code == 307 + + # resp = c.get(resp.headers["Location"], allow_redirects=False) + + # assert self.__partial_call_back_url in resp.headers["Location"] + # resp = c.get(resp.headers["Location"], allow_redirects=False) + # assert resp.status_code == 303 + # assert resp.headers["Location"] in self.__link_acount + + # to register account, the user has to visit form at self.__link_acount + csrf = self.get_csrf(self.__link_acount) + # which makes a POST request to self.__link_acount_signup + # weird, but have to go to above URL to collect CSRF toekn + payload = { + "user_name": self.username, + "email": self.email, + self.__csrf_key: csrf, + } + + resp = c.post(self.__link_acount_signup, payload, allow_redirects=False) + assert resp.status_code == 303 + assert resp.headers["Location"] == "/Hostea/support/issues/new" + + resp = c.get(self.__me) + assert resp.status_code == 200 + assert self.username in resp.text + + def new_issue(self): + resp = c.get(self.new_issues_uri, allow_redirects=False) + resp.status_code = 303 + assert "/user/login" in resp.headers["Location"] + + self._sso_login() + resp = c.get(self.new_issues_uri, allow_redirects=False) + assert resp.status_code == 200 + + +def main(): + + dash = Hostea( + username="enough", + email="enough@example.org", + password="asdfas234234vaa", + url="http://localhost:8000", + ) + gitea = Gitea( + gitea_host="http://localhost:8080", + username=dash.username, + email=dash.email, + hostea_org="Hostea", + support_repo="support", + ) + dash.register() + dash.login() + dash.new_ticket(gitea.new_issues_uri) + + gitea.new_issue() + + logging.info("All tests passed") + + +if __name__ == "__main__": + main()