|
| 1 | +"""GitHub Utilities""" |
| 2 | + |
| 3 | +import json |
| 4 | +import os |
| 5 | +import warnings |
| 6 | + |
| 7 | +from dataclasses import dataclass |
| 8 | +from typing import Any, Callable, cast, Dict, List, Optional, Tuple, Union |
| 9 | +from urllib.error import HTTPError |
| 10 | +from urllib.parse import quote |
| 11 | +from urllib.request import Request, urlopen |
| 12 | + |
| 13 | + |
| 14 | +GITHUB_API_URL = "https://api.github.com" |
| 15 | + |
| 16 | + |
| 17 | +@dataclass |
| 18 | +class GitHubComment: |
| 19 | + body_text: str |
| 20 | + created_at: str |
| 21 | + author_login: str |
| 22 | + author_association: str |
| 23 | + editor_login: Optional[str] |
| 24 | + database_id: int |
| 25 | + url: str |
| 26 | + |
| 27 | + |
| 28 | +def gh_fetch_url_and_headers( |
| 29 | + url: str, |
| 30 | + *, |
| 31 | + headers: Optional[Dict[str, str]] = None, |
| 32 | + data: Union[Optional[Dict[str, Any]], str] = None, |
| 33 | + method: Optional[str] = None, |
| 34 | + reader: Callable[[Any], Any] = lambda x: x.read(), |
| 35 | +) -> Tuple[Any, Any]: |
| 36 | + if headers is None: |
| 37 | + headers = {} |
| 38 | + token = os.environ.get("GITHUB_TOKEN") |
| 39 | + if token is not None and url.startswith(f"{GITHUB_API_URL}/"): |
| 40 | + headers["Authorization"] = f"token {token}" |
| 41 | + |
| 42 | + data_ = None |
| 43 | + if data is not None: |
| 44 | + data_ = data.encode() if isinstance(data, str) else json.dumps(data).encode() |
| 45 | + |
| 46 | + try: |
| 47 | + with urlopen(Request(url, headers=headers, data=data_, method=method)) as conn: |
| 48 | + return conn.headers, reader(conn) |
| 49 | + except HTTPError as err: |
| 50 | + if err.code == 403 and all( |
| 51 | + key in err.headers for key in ["X-RateLimit-Limit", "X-RateLimit-Used"] |
| 52 | + ): |
| 53 | + print( |
| 54 | + f"""Rate limit exceeded: |
| 55 | + Used: {err.headers['X-RateLimit-Used']} |
| 56 | + Limit: {err.headers['X-RateLimit-Limit']} |
| 57 | + Remaining: {err.headers['X-RateLimit-Remaining']} |
| 58 | + Resets at: {err.headers['x-RateLimit-Reset']}""" |
| 59 | + ) |
| 60 | + raise |
| 61 | + |
| 62 | + |
| 63 | +def gh_fetch_url( |
| 64 | + url: str, |
| 65 | + *, |
| 66 | + headers: Optional[Dict[str, str]] = None, |
| 67 | + data: Union[Optional[Dict[str, Any]], str] = None, |
| 68 | + method: Optional[str] = None, |
| 69 | + reader: Callable[[Any], Any] = lambda x: x.read(), |
| 70 | +) -> Any: |
| 71 | + return gh_fetch_url_and_headers( |
| 72 | + url, headers=headers, data=data, reader=json.load, method=method |
| 73 | + )[1] |
| 74 | + |
| 75 | + |
| 76 | +def gh_fetch_json( |
| 77 | + url: str, |
| 78 | + params: Optional[Dict[str, Any]] = None, |
| 79 | + data: Optional[Dict[str, Any]] = None, |
| 80 | + method: Optional[str] = None, |
| 81 | +) -> List[Dict[str, Any]]: |
| 82 | + headers = {"Accept": "application/vnd.github.v3+json"} |
| 83 | + if params is not None and len(params) > 0: |
| 84 | + url += "?" + "&".join( |
| 85 | + f"{name}={quote(str(val))}" for name, val in params.items() |
| 86 | + ) |
| 87 | + return cast( |
| 88 | + List[Dict[str, Any]], |
| 89 | + gh_fetch_url(url, headers=headers, data=data, reader=json.load, method=method), |
| 90 | + ) |
| 91 | + |
| 92 | + |
| 93 | +def _gh_fetch_json_any( |
| 94 | + url: str, |
| 95 | + params: Optional[Dict[str, Any]] = None, |
| 96 | + data: Optional[Dict[str, Any]] = None, |
| 97 | +) -> Any: |
| 98 | + headers = {"Accept": "application/vnd.github.v3+json"} |
| 99 | + if params is not None and len(params) > 0: |
| 100 | + url += "?" + "&".join( |
| 101 | + f"{name}={quote(str(val))}" for name, val in params.items() |
| 102 | + ) |
| 103 | + return gh_fetch_url(url, headers=headers, data=data, reader=json.load) |
| 104 | + |
| 105 | + |
| 106 | +def gh_fetch_json_list( |
| 107 | + url: str, |
| 108 | + params: Optional[Dict[str, Any]] = None, |
| 109 | + data: Optional[Dict[str, Any]] = None, |
| 110 | +) -> List[Dict[str, Any]]: |
| 111 | + return cast(List[Dict[str, Any]], _gh_fetch_json_any(url, params, data)) |
| 112 | + |
| 113 | + |
| 114 | +def gh_fetch_json_dict( |
| 115 | + url: str, |
| 116 | + params: Optional[Dict[str, Any]] = None, |
| 117 | + data: Optional[Dict[str, Any]] = None, |
| 118 | +) -> Dict[str, Any]: |
| 119 | + return cast(Dict[str, Any], _gh_fetch_json_any(url, params, data)) |
| 120 | + |
| 121 | + |
| 122 | +def gh_graphql(query: str, **kwargs: Any) -> Dict[str, Any]: |
| 123 | + rc = gh_fetch_url( |
| 124 | + "https://api.github.com/graphql", |
| 125 | + data={"query": query, "variables": kwargs}, |
| 126 | + reader=json.load, |
| 127 | + ) |
| 128 | + if "errors" in rc: |
| 129 | + raise RuntimeError( |
| 130 | + f"GraphQL query {query}, args {kwargs} failed: {rc['errors']}" |
| 131 | + ) |
| 132 | + return cast(Dict[str, Any], rc) |
| 133 | + |
| 134 | + |
| 135 | +def _gh_post_comment( |
| 136 | + url: str, comment: str, dry_run: bool = False |
| 137 | +) -> List[Dict[str, Any]]: |
| 138 | + if dry_run: |
| 139 | + print(comment) |
| 140 | + return [] |
| 141 | + return gh_fetch_json_list(url, data={"body": comment}) |
| 142 | + |
| 143 | + |
| 144 | +def gh_post_pr_comment( |
| 145 | + org: str, repo: str, pr_num: int, comment: str, dry_run: bool = False |
| 146 | +) -> List[Dict[str, Any]]: |
| 147 | + return _gh_post_comment( |
| 148 | + f"{GITHUB_API_URL}/repos/{org}/{repo}/issues/{pr_num}/comments", |
| 149 | + comment, |
| 150 | + dry_run, |
| 151 | + ) |
| 152 | + |
| 153 | + |
| 154 | +def gh_post_commit_comment( |
| 155 | + org: str, repo: str, sha: str, comment: str, dry_run: bool = False |
| 156 | +) -> List[Dict[str, Any]]: |
| 157 | + return _gh_post_comment( |
| 158 | + f"{GITHUB_API_URL}/repos/{org}/{repo}/commits/{sha}/comments", |
| 159 | + comment, |
| 160 | + dry_run, |
| 161 | + ) |
| 162 | + |
| 163 | + |
| 164 | +def gh_delete_comment(org: str, repo: str, comment_id: int) -> None: |
| 165 | + url = f"{GITHUB_API_URL}/repos/{org}/{repo}/issues/comments/{comment_id}" |
| 166 | + gh_fetch_url(url, method="DELETE") |
| 167 | + |
| 168 | + |
| 169 | +def gh_fetch_merge_base(org: str, repo: str, base: str, head: str) -> str: |
| 170 | + merge_base = "" |
| 171 | + # Get the merge base using the GitHub REST API. This is the same as using |
| 172 | + # git merge-base without the need to have git. The API doc can be found at |
| 173 | + # https://docs.github.com/en/rest/commits/commits?apiVersion=2022-11-28#compare-two-commits |
| 174 | + try: |
| 175 | + json_data = gh_fetch_url( |
| 176 | + f"{GITHUB_API_URL}/repos/{org}/{repo}/compare/{base}...{head}", |
| 177 | + headers={"Accept": "application/vnd.github.v3+json"}, |
| 178 | + reader=json.load, |
| 179 | + ) |
| 180 | + if json_data: |
| 181 | + merge_base = json_data.get("merge_base_commit", {}).get("sha", "") |
| 182 | + else: |
| 183 | + warnings.warn( |
| 184 | + f"Failed to get merge base for {base}...{head}: Empty response" |
| 185 | + ) |
| 186 | + except Exception as error: |
| 187 | + warnings.warn(f"Failed to get merge base for {base}...{head}: {error}") |
| 188 | + |
| 189 | + return merge_base |
| 190 | + |
| 191 | + |
| 192 | +def gh_update_pr_state(org: str, repo: str, pr_num: int, state: str = "open") -> None: |
| 193 | + url = f"{GITHUB_API_URL}/repos/{org}/{repo}/pulls/{pr_num}" |
| 194 | + try: |
| 195 | + gh_fetch_url(url, method="PATCH", data={"state": state}) |
| 196 | + except HTTPError as err: |
| 197 | + # When trying to open the pull request, error 422 means that the branch |
| 198 | + # has been deleted and the API couldn't re-open it |
| 199 | + if err.code == 422 and state == "open": |
| 200 | + warnings.warn( |
| 201 | + f"Failed to open {pr_num} because its head branch has been deleted: {err}" |
| 202 | + ) |
| 203 | + else: |
| 204 | + raise |
| 205 | + |
| 206 | + |
| 207 | +def gh_query_issues_by_labels( |
| 208 | + org: str, repo: str, labels: List[str], state: str = "open" |
| 209 | +) -> List[Dict[str, Any]]: |
| 210 | + url = f"{GITHUB_API_URL}/repos/{org}/{repo}/issues" |
| 211 | + return gh_fetch_json( |
| 212 | + url, method="GET", params={"labels": ",".join(labels), "state": state} |
| 213 | + ) |
0 commit comments