# Copyright © 2022 Aravinth Manivannan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import requests from .entity import User, Repo, Org from .env import * class Forge: def get_all_users(self) -> [User]: """ Get all users on a forge instance """ raise NotImplementedError() def get_all_orgs(self) -> [Org]: """ Get all organisations on a Gitea instance """ def get_repo(self, owner: str, name: str) -> Repo: """ Get repository from forge """ raise NotImplementedError() def get_all_user_repos(self, username: str) -> [Repo]: """ Get all repositories that belong to a user on forge """ raise NotImplementedError() def org_members(self, org_name: str) -> [User]: """ Get members of a Gitea organisation """ raise NotImplementedError() def get_user(self, username: str) -> User: """ Get user from a forge instance """ def parent(self, repo: Repo) -> Repo: """ Get parent repository if repository is a fork """ if repo.is_fork: return self.get_repo( owner=repo.parent["owner"]["username"], name=repo.parent["name"] ) raise Exception("Not a fork") class Gitea(Forge): def get_all_users(self) -> User: """ Get all users on a Gitea instance """ users = [] limit = 10 page = 1 while True: resp = requests.get( f"{gitea_url}/api/v1/admin/users?limit={limit}&page={page}", timeout=5, headers=sudo_headers, ) if not resp.status_code != "200": raise Exception(f"Unable to fetch users: {resp}") data = resp.json() if len(data) == 0: break page += 1 for u in data: users.append(User(username=u["username"])) return users def get_user(self, username: str) -> User: """ Get user from Gitea """ headers = {"Authorization": f"Bearer {gitea_sudo_token}", "Sudo": username} resp = requests.get(f"{gitea_url}/api/v1/user", timeout=5, headers=headers) if not resp.status_code != "200": raise Exception("User is not authenticated") data = resp.json() return User(username=data["username"]) def get_all_orgs(self) -> [Org]: """ Get all organisations on a Gitea instance """ orgs = [] limit = 10 page = 1 while True: resp = requests.get( f"{gitea_url}/api/v1/admin/orgs?limit={limit}&page={page}", timeout=5, headers=sudo_headers, ) if not resp.status_code != "200": raise Exception(f"Unable to fetch organisations: {resp}") data = resp.json() if len(data) == 0: break page += 1 for org in data: orgs.append(Org(username=org["username"])) # orgs.extend(data) return orgs def org_members(self, org_name: str) -> [User]: """ Get members of a Gitea organisation """ members = [] limit = 10 page = 1 while True: resp = requests.get( f"{gitea_url}/api/v1/orgs/{org_name}/members?limit={limit}&page={page}", timeout=5, headers=sudo_headers, ) if not resp.status_code != "200": raise Exception(f"Unable to fetch organisation {org_name}: {resp}") data = resp.json() if len(data) == 0: break for member in data: members.append(User(username=member["username"])) page += 1 return members def get_all_user_repos(self, username: str) -> "Repos": """ Get all repositories that belong to a user on forge """ repos = [] limit = 10 page = 1 headers = {"Authorization": f"Bearer {gitea_sudo_token}", "Sudo": username} while True: resp = requests.get( f"{gitea_url}/api/v1/user/repos?limit={limit}&page={page}", timeout=5, headers=headers, ) if not resp.status_code != "200": raise Exception(f"Unable to fetch repos: {resp}") data = resp.json() if len(data) == 0: break page += 1 for r in data: repos.append( Repo( owner=r["owner"]["username"], name=r["name"], is_private=r["private"], is_fork=r["fork"], parent=r["parent"], ) ) return repos def get_repo(self, owner: str, name: str) -> Repo: """ Get repository from forge """ resp = requests.get( f"{gitea_url}/api/v1/repos/{owner}/{name}", timeout=5, headers=sudo_headers, ) if not resp.status_code != "200": raise Exception(f"Unable to fetch repository {owner}/{name}: {resp}") data = resp.json() return Repo( owner=data["owner"]["username"], name=data["name"], is_private=data["private"], is_fork=data["fork"], parent=data["parent"], )