277 lines
8.2 KiB
Python
277 lines
8.2 KiB
Python
|
import logging
|
||
|
from urllib.parse import urlparse, urlunparse
|
||
|
from html.parser import HTMLParser
|
||
|
|
||
|
from requests import Session
|
||
|
|
||
|
logging.basicConfig(level=logging.INFO)
|
||
|
|
||
|
|
||
|
c = Session()
|
||
|
|
||
|
|
||
|
class ParseCSRF(HTMLParser):
|
||
|
token: str = None
|
||
|
|
||
|
def __init__(self, name):
|
||
|
HTMLParser.__init__(self)
|
||
|
self.name = name
|
||
|
|
||
|
@classmethod
|
||
|
def dashboard_parser(cls) -> "ParseCSRF":
|
||
|
return cls(name="csrfmiddlewaretoken")
|
||
|
|
||
|
@classmethod
|
||
|
def gitea_parser(cls) -> "ParseCSRF":
|
||
|
return cls(name="_csrf")
|
||
|
|
||
|
def handle_starttag(self, tag: str, attrs: (str, str)):
|
||
|
if self.token:
|
||
|
return
|
||
|
|
||
|
if tag != "input":
|
||
|
return
|
||
|
|
||
|
token = None
|
||
|
for (index, (k, v)) in enumerate(attrs):
|
||
|
if k == "value":
|
||
|
token = v
|
||
|
|
||
|
if all([k == "name", v == self.name]):
|
||
|
if token:
|
||
|
self.token = token
|
||
|
return
|
||
|
for (inner_index, (nk, nv)) in enumerate(attrs, start=index):
|
||
|
if nk == "value":
|
||
|
self.token = nv
|
||
|
return
|
||
|
|
||
|
|
||
|
class ParseSSOLogin(HTMLParser):
|
||
|
url: str = None
|
||
|
|
||
|
def handle_starttag(self, tag: str, attrs: (str, str)):
|
||
|
if self.url:
|
||
|
return
|
||
|
|
||
|
if tag != "a":
|
||
|
return
|
||
|
|
||
|
token = None
|
||
|
for (index, (k, v)) in enumerate(attrs):
|
||
|
if k == "href":
|
||
|
if "/user/oauth2/" in v:
|
||
|
self.url = v
|
||
|
return
|
||
|
|
||
|
|
||
|
class Hostea:
|
||
|
def __init__(self, username: str, email: str, password: str, url: str):
|
||
|
self.username = username
|
||
|
self.email = email
|
||
|
self.password = password
|
||
|
self.url = urlparse(url)
|
||
|
self.csrf_key = "csrfmiddlewaretoken"
|
||
|
|
||
|
def get_csrf(self, url: str) -> str:
|
||
|
resp = c.get(url)
|
||
|
assert resp.status_code == 200
|
||
|
parser = ParseCSRF(name=self.csrf_key)
|
||
|
parser.feed(resp.text)
|
||
|
csrf = parser.token
|
||
|
return csrf
|
||
|
|
||
|
def __get_verification_link(self):
|
||
|
resp = c.get("http://localhost:1080/email/")
|
||
|
emails = resp.json()
|
||
|
for email in emails:
|
||
|
if email["to"][0]["address"] == self.email:
|
||
|
logging.info("[Dashboard] Found verification link")
|
||
|
resp = c.delete(f"http://localhost:1080/email/{email['id']}")
|
||
|
return str.strip(email["text"].split("\n")[1])
|
||
|
logging.critical("[Dashboard] Verification link not found")
|
||
|
|
||
|
def register(self):
|
||
|
url = urlunparse((self.url.scheme, self.url.netloc, "/register/", "", "", ""))
|
||
|
csrf = self.get_csrf(url)
|
||
|
payload = {
|
||
|
"username": self.username,
|
||
|
"password": self.password,
|
||
|
"email": self.email,
|
||
|
"confirm_password": self.password,
|
||
|
self.csrf_key: csrf,
|
||
|
}
|
||
|
|
||
|
logging.info("Registering user")
|
||
|
resp = c.post(url, payload, allow_redirects=False)
|
||
|
assert resp.status_code == 302
|
||
|
assert "pending" in resp.headers["Location"]
|
||
|
|
||
|
email_verification_link = self.__get_verification_link()
|
||
|
csrf = self.get_csrf(email_verification_link)
|
||
|
payload = {
|
||
|
self.csrf_key: csrf,
|
||
|
}
|
||
|
resp = c.post(email_verification_link, payload, allow_redirects=False)
|
||
|
assert resp.status_code == 302
|
||
|
assert resp.headers["Location"] == "/login/"
|
||
|
logging.info("[Dashboard] Email verified user")
|
||
|
|
||
|
def login(self):
|
||
|
url = urlunparse((self.url.scheme, self.url.netloc, "/login/", "", "", ""))
|
||
|
|
||
|
csrf = self.get_csrf(url)
|
||
|
payload = {
|
||
|
"login": self.username,
|
||
|
"password": self.password,
|
||
|
self.csrf_key: csrf,
|
||
|
}
|
||
|
|
||
|
logging.info("Logging In user")
|
||
|
resp = c.post(url, payload, allow_redirects=False)
|
||
|
|
||
|
assert resp.status_code == 302
|
||
|
assert resp.headers["Location"] == "/"
|
||
|
|
||
|
url = urlunparse(
|
||
|
(self.url.scheme, self.url.netloc, "/support/new/", "", "", "")
|
||
|
)
|
||
|
|
||
|
resp = c.get(url)
|
||
|
assert resp.status_code == 200
|
||
|
|
||
|
def new_ticket(self, support_repository_new_issue: str):
|
||
|
url = urlunparse(
|
||
|
(self.url.scheme, self.url.netloc, "/support/new/", "", "", "")
|
||
|
)
|
||
|
resp = c.get(url)
|
||
|
assert support_repository_new_issue in resp.text
|
||
|
|
||
|
|
||
|
class Gitea:
|
||
|
def __init__(
|
||
|
self,
|
||
|
username: str,
|
||
|
email: str,
|
||
|
gitea_host: str,
|
||
|
hostea_org: str,
|
||
|
support_repo: str,
|
||
|
):
|
||
|
self.username = username
|
||
|
self.gitea_host = gitea_host
|
||
|
self.hostea_org = hostea_org
|
||
|
self.support_repo = support_repo
|
||
|
self.email = email
|
||
|
|
||
|
self.__csrf_key = "_csrf"
|
||
|
|
||
|
url = urlparse(self.gitea_host)
|
||
|
repo = f"{self.hostea_org}/{self.support_repo}"
|
||
|
issues = f"{repo}/issues"
|
||
|
new_issues = f"{issues}/new"
|
||
|
|
||
|
self.__partial_call_back_url = urlunparse(
|
||
|
(url.scheme, url.netloc, "/user/oauth2/", "", "", "")
|
||
|
)
|
||
|
self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", ""))
|
||
|
self.__link_acount = urlunparse(
|
||
|
(url.scheme, url.netloc, "/user/link_account/", "", "", "")
|
||
|
)
|
||
|
self.__link_acount_signup = urlunparse(
|
||
|
(url.scheme, url.netloc, "/user/link_account_signup/", "", "", "")
|
||
|
)
|
||
|
|
||
|
self.__me = urlunparse(
|
||
|
(url.scheme, url.netloc, f"/{self.username}", "", "", "")
|
||
|
)
|
||
|
|
||
|
self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", ""))
|
||
|
self.new_issues_uri = urlunparse(
|
||
|
(url.scheme, url.netloc, new_issues, "", "", "")
|
||
|
)
|
||
|
|
||
|
def get_csrf(self, url: str) -> str:
|
||
|
resp = c.get(url)
|
||
|
parser = ParseCSRF.gitea_parser()
|
||
|
parser.feed(resp.text)
|
||
|
return parser.token
|
||
|
|
||
|
def _sso_login(self):
|
||
|
resp = c.get(self.__login)
|
||
|
parser = ParseSSOLogin()
|
||
|
parser.feed(resp.text)
|
||
|
|
||
|
url = urlparse(self.gitea_host)
|
||
|
## SSO URL in Gitea login page
|
||
|
sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", ""))
|
||
|
|
||
|
# redirects are enabled to for a cleaner implementation. Commented out
|
||
|
# code below does the same in a step-by-step manner
|
||
|
resp = c.get(sso)
|
||
|
|
||
|
# resp = c.get(sso, allow_redirects=False)
|
||
|
# ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization
|
||
|
# assert resp.status_code == 307
|
||
|
|
||
|
# resp = c.get(resp.headers["Location"], allow_redirects=False)
|
||
|
|
||
|
# assert self.__partial_call_back_url in resp.headers["Location"]
|
||
|
# resp = c.get(resp.headers["Location"], allow_redirects=False)
|
||
|
# assert resp.status_code == 303
|
||
|
# assert resp.headers["Location"] in self.__link_acount
|
||
|
|
||
|
# to register account, the user has to visit form at self.__link_acount
|
||
|
csrf = self.get_csrf(self.__link_acount)
|
||
|
# which makes a POST request to self.__link_acount_signup
|
||
|
# weird, but have to go to above URL to collect CSRF toekn
|
||
|
payload = {
|
||
|
"user_name": self.username,
|
||
|
"email": self.email,
|
||
|
self.__csrf_key: csrf,
|
||
|
}
|
||
|
|
||
|
resp = c.post(self.__link_acount_signup, payload, allow_redirects=False)
|
||
|
assert resp.status_code == 303
|
||
|
assert resp.headers["Location"] == "/Hostea/support/issues/new"
|
||
|
|
||
|
resp = c.get(self.__me)
|
||
|
assert resp.status_code == 200
|
||
|
assert self.username in resp.text
|
||
|
|
||
|
def new_issue(self):
|
||
|
resp = c.get(self.new_issues_uri, allow_redirects=False)
|
||
|
resp.status_code = 303
|
||
|
assert "/user/login" in resp.headers["Location"]
|
||
|
|
||
|
self._sso_login()
|
||
|
resp = c.get(self.new_issues_uri, allow_redirects=False)
|
||
|
assert resp.status_code == 200
|
||
|
|
||
|
|
||
|
def main():
|
||
|
|
||
|
dash = Hostea(
|
||
|
username="enough",
|
||
|
email="enough@example.org",
|
||
|
password="asdfas234234vaa",
|
||
|
url="http://localhost:8000",
|
||
|
)
|
||
|
gitea = Gitea(
|
||
|
gitea_host="http://localhost:8080",
|
||
|
username=dash.username,
|
||
|
email=dash.email,
|
||
|
hostea_org="Hostea",
|
||
|
support_repo="support",
|
||
|
)
|
||
|
dash.register()
|
||
|
dash.login()
|
||
|
dash.new_ticket(gitea.new_issues_uri)
|
||
|
|
||
|
gitea.new_issue()
|
||
|
|
||
|
logging.info("All tests passed")
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|