dashboard/integration/tests.py

277 lines
8.2 KiB
Python
Raw Normal View History

2022-06-20 17:46:59 +00:00
import logging
from urllib.parse import urlparse, urlunparse
from html.parser import HTMLParser
from requests import Session
logging.basicConfig(level=logging.INFO)
c = Session()
class ParseCSRF(HTMLParser):
token: str = None
def __init__(self, name):
HTMLParser.__init__(self)
self.name = name
@classmethod
def dashboard_parser(cls) -> "ParseCSRF":
return cls(name="csrfmiddlewaretoken")
@classmethod
def gitea_parser(cls) -> "ParseCSRF":
return cls(name="_csrf")
def handle_starttag(self, tag: str, attrs: (str, str)):
if self.token:
return
if tag != "input":
return
token = None
for (index, (k, v)) in enumerate(attrs):
if k == "value":
token = v
if all([k == "name", v == self.name]):
if token:
self.token = token
return
for (inner_index, (nk, nv)) in enumerate(attrs, start=index):
if nk == "value":
self.token = nv
return
class ParseSSOLogin(HTMLParser):
url: str = None
def handle_starttag(self, tag: str, attrs: (str, str)):
if self.url:
return
if tag != "a":
return
token = None
for (index, (k, v)) in enumerate(attrs):
if k == "href":
if "/user/oauth2/" in v:
self.url = v
return
class Hostea:
def __init__(self, username: str, email: str, password: str, url: str):
self.username = username
self.email = email
self.password = password
self.url = urlparse(url)
self.csrf_key = "csrfmiddlewaretoken"
def get_csrf(self, url: str) -> str:
resp = c.get(url)
assert resp.status_code == 200
parser = ParseCSRF(name=self.csrf_key)
parser.feed(resp.text)
csrf = parser.token
return csrf
def __get_verification_link(self):
resp = c.get("http://localhost:1080/email/")
emails = resp.json()
for email in emails:
if email["to"][0]["address"] == self.email:
logging.info("[Dashboard] Found verification link")
resp = c.delete(f"http://localhost:1080/email/{email['id']}")
return str.strip(email["text"].split("\n")[1])
logging.critical("[Dashboard] Verification link not found")
def register(self):
url = urlunparse((self.url.scheme, self.url.netloc, "/register/", "", "", ""))
csrf = self.get_csrf(url)
payload = {
"username": self.username,
"password": self.password,
"email": self.email,
"confirm_password": self.password,
self.csrf_key: csrf,
}
logging.info("Registering user")
resp = c.post(url, payload, allow_redirects=False)
assert resp.status_code == 302
assert "pending" in resp.headers["Location"]
email_verification_link = self.__get_verification_link()
csrf = self.get_csrf(email_verification_link)
payload = {
self.csrf_key: csrf,
}
resp = c.post(email_verification_link, payload, allow_redirects=False)
assert resp.status_code == 302
assert resp.headers["Location"] == "/login/"
logging.info("[Dashboard] Email verified user")
def login(self):
url = urlunparse((self.url.scheme, self.url.netloc, "/login/", "", "", ""))
csrf = self.get_csrf(url)
payload = {
"login": self.username,
"password": self.password,
self.csrf_key: csrf,
}
logging.info("Logging In user")
resp = c.post(url, payload, allow_redirects=False)
assert resp.status_code == 302
assert resp.headers["Location"] == "/"
url = urlunparse(
(self.url.scheme, self.url.netloc, "/support/new/", "", "", "")
)
resp = c.get(url)
assert resp.status_code == 200
def new_ticket(self, support_repository_new_issue: str):
url = urlunparse(
(self.url.scheme, self.url.netloc, "/support/new/", "", "", "")
)
resp = c.get(url)
assert support_repository_new_issue in resp.text
class Gitea:
def __init__(
self,
username: str,
email: str,
gitea_host: str,
hostea_org: str,
support_repo: str,
):
self.username = username
self.gitea_host = gitea_host
self.hostea_org = hostea_org
self.support_repo = support_repo
self.email = email
self.__csrf_key = "_csrf"
url = urlparse(self.gitea_host)
repo = f"{self.hostea_org}/{self.support_repo}"
issues = f"{repo}/issues"
new_issues = f"{issues}/new"
self.__partial_call_back_url = urlunparse(
(url.scheme, url.netloc, "/user/oauth2/", "", "", "")
)
self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", ""))
self.__link_acount = urlunparse(
(url.scheme, url.netloc, "/user/link_account/", "", "", "")
)
self.__link_acount_signup = urlunparse(
(url.scheme, url.netloc, "/user/link_account_signup/", "", "", "")
)
self.__me = urlunparse(
(url.scheme, url.netloc, f"/{self.username}", "", "", "")
)
self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", ""))
self.new_issues_uri = urlunparse(
(url.scheme, url.netloc, new_issues, "", "", "")
)
def get_csrf(self, url: str) -> str:
resp = c.get(url)
parser = ParseCSRF.gitea_parser()
parser.feed(resp.text)
return parser.token
def _sso_login(self):
resp = c.get(self.__login)
parser = ParseSSOLogin()
parser.feed(resp.text)
url = urlparse(self.gitea_host)
## SSO URL in Gitea login page
sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", ""))
# redirects are enabled to for a cleaner implementation. Commented out
# code below does the same in a step-by-step manner
resp = c.get(sso)
# resp = c.get(sso, allow_redirects=False)
# ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization
# assert resp.status_code == 307
# resp = c.get(resp.headers["Location"], allow_redirects=False)
# assert self.__partial_call_back_url in resp.headers["Location"]
# resp = c.get(resp.headers["Location"], allow_redirects=False)
# assert resp.status_code == 303
# assert resp.headers["Location"] in self.__link_acount
# to register account, the user has to visit form at self.__link_acount
csrf = self.get_csrf(self.__link_acount)
# which makes a POST request to self.__link_acount_signup
# weird, but have to go to above URL to collect CSRF toekn
payload = {
"user_name": self.username,
"email": self.email,
self.__csrf_key: csrf,
}
resp = c.post(self.__link_acount_signup, payload, allow_redirects=False)
assert resp.status_code == 303
assert resp.headers["Location"] == "/Hostea/support/issues/new"
resp = c.get(self.__me)
assert resp.status_code == 200
assert self.username in resp.text
def new_issue(self):
resp = c.get(self.new_issues_uri, allow_redirects=False)
resp.status_code = 303
assert "/user/login" in resp.headers["Location"]
self._sso_login()
resp = c.get(self.new_issues_uri, allow_redirects=False)
assert resp.status_code == 200
def main():
dash = Hostea(
username="enough",
email="enough@example.org",
password="asdfas234234vaa",
url="http://localhost:8000",
)
gitea = Gitea(
gitea_host="http://localhost:8080",
username=dash.username,
email=dash.email,
hostea_org="Hostea",
support_repo="support",
)
dash.register()
dash.login()
dash.new_ticket(gitea.new_issues_uri)
gitea.new_issue()
logging.info("All tests passed")
if __name__ == "__main__":
main()