From c1577f824cf977fde5b1f241084fa78103d7d3a7 Mon Sep 17 00:00:00 2001 From: realaravinth Date: Mon, 20 Jun 2022 23:16:59 +0530 Subject: [PATCH] feat: Dashboard-Gitea SSO integration test script STEPS 1. Register new user on dashboard 2. Confirm user email, link is received from email. maildev/maildev is an SMTP server specifically built for testing emails locally. It comes with a REST API[0], which is used to access emails 3. Sign in to Dashboard 4. Visit /support/new/ on dashboard to raise new support request 5. Redirection to Hostea Gitea support repository is done via JavaScript, so we simply test to see if the support repository's new issue page is present in the Dashboard response 6. Go to support repository's new issue page. Gitea will redirect to sign in page 7. Parse sign in page, find OIDC SSO link in sign in page 8. Visit OIDC SSO link in sign in page, to be redirected to authorization page 9. If OIDC integration on Dashboard is setup via `create_oidc` management command, then auto-authorization will be enabled for the integration. So user will be redirected to Gitea 10. For new OIDC logins, Gitea will present a form to choose preferred username and enter email address. So fill that form and submit it. Please note the form submits to a different URL than the one at which the form is available. See `Gitea.__link_acount` and `Gitea.__link_acount_signup` and its usage in `Gitea._sso_login` 11. Verify user creation by GET /{username}, should respond HTTP 200 12. Visit new issue on support repository, should respond HTTP 200 RESOURCES [0]: https://github.com/maildev/maildev/blob/master/docs/rest.md --- Makefile | 1 + integration/tests.py | 276 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 277 insertions(+) create mode 100644 integration/tests.py diff --git a/Makefile b/Makefile index 2af4ca2..ef5c582 100644 --- a/Makefile +++ b/Makefile @@ -38,6 +38,7 @@ lint: ## Run linter @./venv/bin/black ./dash/* @./venv/bin/black ./support/* @./venv/bin/black ./billing/* + @./venv/bin/black ./integration/ migrate: ## Run migrations $(call run_migrations) diff --git a/integration/tests.py b/integration/tests.py new file mode 100644 index 0000000..7613446 --- /dev/null +++ b/integration/tests.py @@ -0,0 +1,276 @@ +import logging +from urllib.parse import urlparse, urlunparse +from html.parser import HTMLParser + +from requests import Session + +logging.basicConfig(level=logging.INFO) + + +c = Session() + + +class ParseCSRF(HTMLParser): + token: str = None + + def __init__(self, name): + HTMLParser.__init__(self) + self.name = name + + @classmethod + def dashboard_parser(cls) -> "ParseCSRF": + return cls(name="csrfmiddlewaretoken") + + @classmethod + def gitea_parser(cls) -> "ParseCSRF": + return cls(name="_csrf") + + def handle_starttag(self, tag: str, attrs: (str, str)): + if self.token: + return + + if tag != "input": + return + + token = None + for (index, (k, v)) in enumerate(attrs): + if k == "value": + token = v + + if all([k == "name", v == self.name]): + if token: + self.token = token + return + for (inner_index, (nk, nv)) in enumerate(attrs, start=index): + if nk == "value": + self.token = nv + return + + +class ParseSSOLogin(HTMLParser): + url: str = None + + def handle_starttag(self, tag: str, attrs: (str, str)): + if self.url: + return + + if tag != "a": + return + + token = None + for (index, (k, v)) in enumerate(attrs): + if k == "href": + if "/user/oauth2/" in v: + self.url = v + return + + +class Hostea: + def __init__(self, username: str, email: str, password: str, url: str): + self.username = username + self.email = email + self.password = password + self.url = urlparse(url) + self.csrf_key = "csrfmiddlewaretoken" + + def get_csrf(self, url: str) -> str: + resp = c.get(url) + assert resp.status_code == 200 + parser = ParseCSRF(name=self.csrf_key) + parser.feed(resp.text) + csrf = parser.token + return csrf + + def __get_verification_link(self): + resp = c.get("http://localhost:1080/email/") + emails = resp.json() + for email in emails: + if email["to"][0]["address"] == self.email: + logging.info("[Dashboard] Found verification link") + resp = c.delete(f"http://localhost:1080/email/{email['id']}") + return str.strip(email["text"].split("\n")[1]) + logging.critical("[Dashboard] Verification link not found") + + def register(self): + url = urlunparse((self.url.scheme, self.url.netloc, "/register/", "", "", "")) + csrf = self.get_csrf(url) + payload = { + "username": self.username, + "password": self.password, + "email": self.email, + "confirm_password": self.password, + self.csrf_key: csrf, + } + + logging.info("Registering user") + resp = c.post(url, payload, allow_redirects=False) + assert resp.status_code == 302 + assert "pending" in resp.headers["Location"] + + email_verification_link = self.__get_verification_link() + csrf = self.get_csrf(email_verification_link) + payload = { + self.csrf_key: csrf, + } + resp = c.post(email_verification_link, payload, allow_redirects=False) + assert resp.status_code == 302 + assert resp.headers["Location"] == "/login/" + logging.info("[Dashboard] Email verified user") + + def login(self): + url = urlunparse((self.url.scheme, self.url.netloc, "/login/", "", "", "")) + + csrf = self.get_csrf(url) + payload = { + "login": self.username, + "password": self.password, + self.csrf_key: csrf, + } + + logging.info("Logging In user") + resp = c.post(url, payload, allow_redirects=False) + + assert resp.status_code == 302 + assert resp.headers["Location"] == "/" + + url = urlunparse( + (self.url.scheme, self.url.netloc, "/support/new/", "", "", "") + ) + + resp = c.get(url) + assert resp.status_code == 200 + + def new_ticket(self, support_repository_new_issue: str): + url = urlunparse( + (self.url.scheme, self.url.netloc, "/support/new/", "", "", "") + ) + resp = c.get(url) + assert support_repository_new_issue in resp.text + + +class Gitea: + def __init__( + self, + username: str, + email: str, + gitea_host: str, + hostea_org: str, + support_repo: str, + ): + self.username = username + self.gitea_host = gitea_host + self.hostea_org = hostea_org + self.support_repo = support_repo + self.email = email + + self.__csrf_key = "_csrf" + + url = urlparse(self.gitea_host) + repo = f"{self.hostea_org}/{self.support_repo}" + issues = f"{repo}/issues" + new_issues = f"{issues}/new" + + self.__partial_call_back_url = urlunparse( + (url.scheme, url.netloc, "/user/oauth2/", "", "", "") + ) + self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", "")) + self.__link_acount = urlunparse( + (url.scheme, url.netloc, "/user/link_account/", "", "", "") + ) + self.__link_acount_signup = urlunparse( + (url.scheme, url.netloc, "/user/link_account_signup/", "", "", "") + ) + + self.__me = urlunparse( + (url.scheme, url.netloc, f"/{self.username}", "", "", "") + ) + + self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", "")) + self.new_issues_uri = urlunparse( + (url.scheme, url.netloc, new_issues, "", "", "") + ) + + def get_csrf(self, url: str) -> str: + resp = c.get(url) + parser = ParseCSRF.gitea_parser() + parser.feed(resp.text) + return parser.token + + def _sso_login(self): + resp = c.get(self.__login) + parser = ParseSSOLogin() + parser.feed(resp.text) + + url = urlparse(self.gitea_host) + ## SSO URL in Gitea login page + sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", "")) + + # redirects are enabled to for a cleaner implementation. Commented out + # code below does the same in a step-by-step manner + resp = c.get(sso) + + # resp = c.get(sso, allow_redirects=False) + # ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization + # assert resp.status_code == 307 + + # resp = c.get(resp.headers["Location"], allow_redirects=False) + + # assert self.__partial_call_back_url in resp.headers["Location"] + # resp = c.get(resp.headers["Location"], allow_redirects=False) + # assert resp.status_code == 303 + # assert resp.headers["Location"] in self.__link_acount + + # to register account, the user has to visit form at self.__link_acount + csrf = self.get_csrf(self.__link_acount) + # which makes a POST request to self.__link_acount_signup + # weird, but have to go to above URL to collect CSRF toekn + payload = { + "user_name": self.username, + "email": self.email, + self.__csrf_key: csrf, + } + + resp = c.post(self.__link_acount_signup, payload, allow_redirects=False) + assert resp.status_code == 303 + assert resp.headers["Location"] == "/Hostea/support/issues/new" + + resp = c.get(self.__me) + assert resp.status_code == 200 + assert self.username in resp.text + + def new_issue(self): + resp = c.get(self.new_issues_uri, allow_redirects=False) + resp.status_code = 303 + assert "/user/login" in resp.headers["Location"] + + self._sso_login() + resp = c.get(self.new_issues_uri, allow_redirects=False) + assert resp.status_code == 200 + + +def main(): + + dash = Hostea( + username="enough", + email="enough@example.org", + password="asdfas234234vaa", + url="http://localhost:8000", + ) + gitea = Gitea( + gitea_host="http://localhost:8080", + username=dash.username, + email=dash.email, + hostea_org="Hostea", + support_repo="support", + ) + dash.register() + dash.login() + dash.new_ticket(gitea.new_issues_uri) + + gitea.new_issue() + + logging.info("All tests passed") + + +if __name__ == "__main__": + main()