import logging from urllib.parse import urlparse, urlunparse from html.parser import HTMLParser from requests import Session logging.basicConfig(level=logging.INFO) c = Session() class ParseCSRF(HTMLParser): token: str = None def __init__(self, name): HTMLParser.__init__(self) self.name = name @classmethod def dashboard_parser(cls) -> "ParseCSRF": return cls(name="csrfmiddlewaretoken") @classmethod def gitea_parser(cls) -> "ParseCSRF": return cls(name="_csrf") def handle_starttag(self, tag: str, attrs: (str, str)): if self.token: return if tag != "input": return token = None for (index, (k, v)) in enumerate(attrs): if k == "value": token = v if all([k == "name", v == self.name]): if token: self.token = token return for (inner_index, (nk, nv)) in enumerate(attrs, start=index): if nk == "value": self.token = nv return class ParseSSOLogin(HTMLParser): url: str = None def handle_starttag(self, tag: str, attrs: (str, str)): if self.url: return if tag != "a": return token = None for (index, (k, v)) in enumerate(attrs): if k == "href": if "/user/oauth2/" in v: self.url = v return class Hostea: def __init__(self, username: str, email: str, password: str, url: str): self.username = username self.email = email self.password = password self.url = urlparse(url) self.csrf_key = "csrfmiddlewaretoken" def get_csrf(self, url: str) -> str: resp = c.get(url) assert resp.status_code == 200 parser = ParseCSRF(name=self.csrf_key) parser.feed(resp.text) csrf = parser.token return csrf def __get_verification_link(self): resp = c.get("http://localhost:1080/email/") emails = resp.json() for email in emails: if email["to"][0]["address"] == self.email: logging.info("[Dashboard] Found verification link") resp = c.delete(f"http://localhost:1080/email/{email['id']}") return str.strip(email["text"].split("\n")[1]) logging.critical("[Dashboard] Verification link not found") def register(self): url = urlunparse((self.url.scheme, self.url.netloc, "/register/", "", "", "")) csrf = self.get_csrf(url) payload = { "username": self.username, "password": self.password, "email": self.email, "confirm_password": self.password, self.csrf_key: csrf, } logging.info("Registering user") resp = c.post(url, payload, allow_redirects=False) assert resp.status_code == 302 assert "pending" in resp.headers["Location"] email_verification_link = self.__get_verification_link() csrf = self.get_csrf(email_verification_link) payload = { self.csrf_key: csrf, } resp = c.post(email_verification_link, payload, allow_redirects=False) assert resp.status_code == 302 assert resp.headers["Location"] == "/login/" logging.info("[Dashboard] Email verified user") def login(self): url = urlunparse((self.url.scheme, self.url.netloc, "/login/", "", "", "")) csrf = self.get_csrf(url) payload = { "login": self.username, "password": self.password, self.csrf_key: csrf, } logging.info("Logging In user") resp = c.post(url, payload, allow_redirects=False) assert resp.status_code == 302 assert resp.headers["Location"] == "/" url = urlunparse( (self.url.scheme, self.url.netloc, "/support/new/", "", "", "") ) resp = c.get(url) assert resp.status_code == 200 def new_ticket(self, support_repository_new_issue: str): url = urlunparse( (self.url.scheme, self.url.netloc, "/support/new/", "", "", "") ) resp = c.get(url) assert support_repository_new_issue in resp.text class Gitea: def __init__( self, username: str, email: str, gitea_host: str, hostea_org: str, support_repo: str, ): self.username = username self.gitea_host = gitea_host self.hostea_org = hostea_org self.support_repo = support_repo self.email = email self.__csrf_key = "_csrf" url = urlparse(self.gitea_host) repo = f"{self.hostea_org}/{self.support_repo}" issues = f"{repo}/issues" new_issues = f"{issues}/new" self.__partial_call_back_url = urlunparse( (url.scheme, url.netloc, "/user/oauth2/", "", "", "") ) self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", "")) self.__link_acount = urlunparse( (url.scheme, url.netloc, "/user/link_account/", "", "", "") ) self.__link_acount_signup = urlunparse( (url.scheme, url.netloc, "/user/link_account_signup/", "", "", "") ) self.__me = urlunparse( (url.scheme, url.netloc, f"/{self.username}", "", "", "") ) self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", "")) self.new_issues_uri = urlunparse( (url.scheme, url.netloc, new_issues, "", "", "") ) def get_csrf(self, url: str) -> str: resp = c.get(url) parser = ParseCSRF.gitea_parser() parser.feed(resp.text) return parser.token def _sso_login(self): resp = c.get(self.__login) parser = ParseSSOLogin() parser.feed(resp.text) url = urlparse(self.gitea_host) ## SSO URL in Gitea login page sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", "")) # redirects are enabled to for a cleaner implementation. Commented out # code below does the same in a step-by-step manner resp = c.get(sso) # resp = c.get(sso, allow_redirects=False) # ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization # assert resp.status_code == 307 # resp = c.get(resp.headers["Location"], allow_redirects=False) # assert self.__partial_call_back_url in resp.headers["Location"] # resp = c.get(resp.headers["Location"], allow_redirects=False) # assert resp.status_code == 303 # assert resp.headers["Location"] in self.__link_acount # to register account, the user has to visit form at self.__link_acount csrf = self.get_csrf(self.__link_acount) # which makes a POST request to self.__link_acount_signup # weird, but have to go to above URL to collect CSRF toekn payload = { "user_name": self.username, "email": self.email, self.__csrf_key: csrf, } resp = c.post(self.__link_acount_signup, payload, allow_redirects=False) assert resp.status_code == 303 assert resp.headers["Location"] == "/Hostea/support/issues/new" resp = c.get(self.__me) assert resp.status_code == 200 assert self.username in resp.text def new_issue(self): resp = c.get(self.new_issues_uri, allow_redirects=False) resp.status_code = 303 assert "/user/login" in resp.headers["Location"] self._sso_login() resp = c.get(self.new_issues_uri, allow_redirects=False) assert resp.status_code == 200 def main(): dash = Hostea( username="enough", email="enough@example.org", password="asdfas234234vaa", url="http://localhost:8000", ) gitea = Gitea( gitea_host="http://localhost:8080", username=dash.username, email=dash.email, hostea_org="Hostea", support_repo="support", ) dash.register() dash.login() dash.new_ticket(gitea.new_issues_uri) gitea.new_issue() logging.info("All tests passed") if __name__ == "__main__": main()