dashboard/integration/forgejo.py

423 lines
14 KiB
Python
Executable File

import os
import random
from urllib.parse import urlunparse, urlparse
from html.parser import HTMLParser
from time import sleep
from requests import Session
from requests.auth import HTTPBasicAuth
import requests
from .csrf import ParseCSRF
# FORGEJO_USER = "root"
# FORGEJO_EMAIL = "root@example.com"
# FORGEJO_PASSWORD = "foobarpassword"
# HOST = "http://localhost:8080"
#
# REPOS = []
class Forgejo:
def __init__(self, host: str, username: str, password: str, email: str, c: Session):
self.host = host
self.username = username
self.password = password
self.email = email
self.c = c
self.__csrf_key = "_csrf"
self.__logged_in = False
def get_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse((parsed.scheme, parsed.netloc, path, "", "", ""))
def get_api_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse(
(
parsed.scheme,
f"{self.username}:{self.password}@{parsed.netloc}",
path,
"",
"",
"",
)
)
@staticmethod
def check_online(host: str):
"""
Check if Forgejo instance is online
"""
count = 0
parsed = urlparse(host)
url = urlunparse((parsed.scheme, parsed.netloc, "api/v1/nodeinfo", "", "", ""))
while True:
try:
res = requests.get(url, allow_redirects=False)
if any([res.status_code == 302, res.status_code == 200]):
break
except:
sleep(2)
print(f"Retrying {count} time")
count += 1
continue
def install(self):
"""
Install Forgejo, first form that a user sees when a new instance is
deployed
"""
# cwd = os.environ.get("PWD")
# user = os.environ.get("USER")
payload = {
"db_type": "sqlite3",
"db_host": "localhost:3306",
"db_user": "root",
"db_passwd": "",
"db_name": "forgejo",
"ssl_mode": "disable",
"db_schema": "",
"charset": "utf8",
"db_path": "/data/gitea/gitea.db",
"app_name": "Forgejo:+Beyond+Coding+We+Forge",
"repo_root_path": "/data/git/repositories",
"lfs_root_path": "/data/git/lfs",
"run_user": "git",
"domain": "localhost",
"ssh_port": "22",
"http_port": "3000",
"app_url": "http://localhost:3000/",
"log_root_path": "/data/gitea/log",
"smtp_host": "",
"smtp_from": "",
"smtp_user": "",
"smtp_passwd": "",
"enable_federated_avatar": "on",
"enable_open_id_sign_in": "on",
"enable_open_id_sign_up": "on",
"default_allow_create_organization": "on",
"default_enable_timetracking": "on",
"no_reply_address": "noreply.localhost",
"password_algorithm": "pbkdf2",
"admin_name": "",
"admin_passwd": "",
"admin_confirm_passwd": "",
"admin_email": "",
}
resp = self.c.post(self.get_uri(""), data=payload)
sleep(10)
def get_csrf_token(self, url: str) -> str:
"""
Get CSRF token at a URI
"""
resp = self.c.get(url, allow_redirects=False)
if resp.status_code != 200 and resp.status_code != 302:
print(resp.status_code, resp.text)
raise Exception(f"Can't get csrf token: {resp.status_code}")
parser = ParseCSRF(name=self.__csrf_key)
parser.feed(resp.text)
csrf = parser.token
return csrf
def register(self):
"""
Register User
"""
url = self.get_uri("/user/sign_up")
csrf = self.get_csrf_token(url)
payload = {
"_csrf": csrf,
"user_name": self.username,
"password": self.password,
"retype": self.password,
"email": self.email,
}
self.c.post(url, data=payload, allow_redirects=False)
def login(self):
"""
Login, must be called at least once before performing authenticated
operations
"""
if self.__logged_in:
return
url = self.get_uri("/user/login")
csrf = self.get_csrf_token(url)
payload = {
"_csrf": csrf,
"user_name": self.username,
"password": self.password,
"remember": "on",
}
resp = self.c.post(url, data=payload, allow_redirects=False)
if any(
[resp.status_code == 302, resp.status_code == 200, resp.status_code == 303]
):
print("User logged in")
self.__logged_in = True
return
raise Exception(
f"[ERROR] Authentication failed. status code {resp.status_code}"
)
def create_repository(self, name: str):
"""
Create repository
"""
self.login()
def get_repository_payload(csrf: str, name: str, user_id: str):
data = {
"_csrf": csrf,
"uid": user_id,
"repo_name": name,
"description": f"this repository is named {name}",
"repo_template": "",
"issue_labels": "",
"gitignores": "",
"license": "",
"readme": "Default",
"default_branch": "master",
"trust_model": "default",
}
return data
url = self.get_uri("/repo/create")
user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"]
csrf = self.get_csrf_token(url)
data = get_repository_payload(csrf, name, user_id=user_id)
resp = self.c.post(url, data=data, allow_redirects=False)
print(f"Created repository {name}")
if resp.status_code != 302 and resp.status_code != 200:
raise Exception(
f"Error while creating repository: {name} {resp.status_code}"
)
def install_sso(
self,
sso_name: str,
client_id: str,
client_secret: str,
sso_auto_discovery_url: str,
):
self.login()
"""
Install SSO.
- sso_name: human readable SSO name. Doesn't have any functionality
except assisting admins ID an SSO by a name
- client_id: OAuth stuff, will be generated by the SSO that should be integrated
- client_secret: OAuth stuff, will be generated by the SSO that should be integrated
- sso_auto_discovery_url: Again OAuth stuff. Should be available in the SSO documentation.
In Hostea SSO's case, it is available at
https://hostea-dash.example.org/o/.well-known/openid-configuration/
"""
csrf = self.get_csrf_token(self.get_uri("/admin/auths/new"))
payload = {
"_autofill_dummy_username": "",
"_autofill_dummy_password": "",
"_csrf": csrf,
"type": "6",
"name": sso_name,
"security_protocol": "",
"host": "",
"port": "",
"bind_dn": "",
"bind_password": "",
"user_base": "",
"user_dn": "",
"filter": "",
"admin_filter": ["", ""],
"attribute_username": "",
"attribute_name": "",
"attribute_surname": "",
"attribute_mail": "",
"attribute_ssh_public_key": "",
"attribute_avatar": "",
"group_dn": "",
"group_member_uid": "",
"user_uid": "",
"group_filter": "",
"group_team_map": "",
"search_page_size": "",
"smtp_auth": "PLAIN",
"smtp_host": "",
"smtp_port": "",
"helo_hostname": "",
"allowed_domains": "",
"pam_service_name": "",
"pam_email_domain": "",
"oauth2_provider": "openidConnect",
"oauth2_key": client_id,
"oauth2_secret": client_secret,
"oauth2_icon_url": "",
"open_id_connect_auto_discovery_url": sso_auto_discovery_url,
"oauth2_auth_url": "",
"oauth2_token_url": "",
"oauth2_profile_url": "",
"oauth2_email_url": "",
"oauth2_tenant": "",
"oauth2_scopes": "openid",
"oauth2_required_claim_name": "",
"oauth2_required_claim_value": "",
"oauth2_group_claim_name": "",
"oauth2_admin_group": "",
"oauth2_restricted_group": "",
"sspi_auto_create_users": "on",
"sspi_auto_activate_users": "on",
"sspi_strip_domain_names": "on",
"sspi_separator_replacement": "_",
"sspi_default_language": "",
"is_sync_enabled": "on",
"is_active": "on",
}
resp = self.c.post(self.get_uri("/admin/auths/new"), data=payload)
def add_deploy_key(self, repo: str, key: str):
url = self.get_api_uri(f"/api/v1/repos/{self.username}/{repo}/keys")
with open(key, "r", encoding="utf-8") as f:
key = f.read()
payload = {
"key": key,
"read_only": False,
"title": f"{self.username}/{repo} Dashboard test key",
}
resp = self.c.post(url, json=payload)
assert resp.status_code == 201
class ParseSSOLogin(HTMLParser):
url: str = None
def handle_starttag(self, tag: str, attrs: (str, str)):
if self.url:
return
if tag != "a":
return
token = None
for (index, (k, v)) in enumerate(attrs):
if k == "href":
if "/user/oauth2/" in v:
self.url = v
return
class ForgejoSSO:
def __init__(
self,
username: str,
email: str,
forgejo_host: str,
hostea_org: str,
support_repo: str,
c: Session,
):
self.c = c
self.username = username
self.forgejo_host = forgejo_host
self.hostea_org = hostea_org
self.support_repo = support_repo
self.email = email
self.__csrf_key = "_csrf"
url = urlparse(self.forgejo_host)
repo = f"{self.hostea_org}/{self.support_repo}"
issues = f"{repo}/issues"
new_issues = f"{issues}/new"
self.__partial_call_back_url = urlunparse(
(url.scheme, url.netloc, "/user/oauth2/", "", "", "")
)
self.__login = urlunparse((url.scheme, url.netloc, "/user/login/", "", "", ""))
self.__link_acount = urlunparse(
(url.scheme, url.netloc, "/user/link_account/", "", "", "")
)
self.__link_acount_signup = urlunparse(
(url.scheme, url.netloc, "/user/link_account_signup/", "", "", "")
)
self.__me = urlunparse(
(url.scheme, url.netloc, f"/{self.username}", "", "", "")
)
self.issues_uri = urlunparse((url.scheme, url.netloc, issues, "", "", ""))
self.new_issues_uri = urlunparse(
(url.scheme, url.netloc, new_issues, "", "", "")
)
def get_csrf(self, url: str) -> str:
resp = self.c.get(url)
parser = ParseCSRF(name=self.__csrf_key)
parser.feed(resp.text)
return parser.token
def _sso_login(self):
resp = self.c.get(self.__login)
parser = ParseSSOLogin()
parser.feed(resp.text)
url = urlparse(self.forgejo_host)
## SSO URL in Forgejo login page
sso = urlunparse((url.scheme, url.netloc, parser.url, "", "", ""))
# redirects are enabled to for a cleaner implementation. Commented out
# code below does the same in a step-by-step manner
resp = self.c.get(sso)
# resp = c.get(sso, allow_redirects=False)
# ## Visiting SSO URL redirects the user with HTTP 307 to the SSO for authorization
# assert resp.status_code == 307
# resp = c.get(resp.headers["Location"], allow_redirects=False)
# assert self.__partial_call_back_url in resp.headers["Location"]
# resp = c.get(resp.headers["Location"], allow_redirects=False)
# assert resp.status_code == 303
# assert resp.headers["Location"] in self.__link_acount
# to register account, the user has to visit form at self.__link_acount
csrf = self.get_csrf(self.__link_acount)
# which makes a POST request to self.__link_acount_signup
# weird, but have to go to above URL to collect CSRF toekn
payload = {
"user_name": self.username,
"email": self.email,
self.__csrf_key: csrf,
}
resp = self.c.post(self.__link_acount_signup, payload, allow_redirects=True)
assert resp.status_code == 200
# redirect mechanisms seems to change for every version
# print(resp.status_code)
# print(resp.headers["Location"])
# assert resp.status_code == 303
# assert resp.headers["Location"] == self.new_issues_uri
resp = self.c.get(self.__me)
assert resp.status_code == 200
assert self.username in resp.text
def new_issue(self):
resp = self.c.get(self.new_issues_uri, allow_redirects=False)
resp.status_code = 303
assert "/user/login" in resp.headers["Location"]
self._sso_login()
resp = self.c.get(self.new_issues_uri, allow_redirects=False)
assert resp.status_code == 200