-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgithub_api.py
More file actions
106 lines (81 loc) · 4.56 KB
/
github_api.py
File metadata and controls
106 lines (81 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import time
from typing import Any
import aiohttp
class GitHubAPI:
def __init__(self, tokens: list[str]):
self.base_url = "https://api.github.com"
self.tokens = tokens
self.current_token_index = 0
self.token_last_used = [.0] * len(tokens)
self.cache = {}
self.cache_ttl = 1800
self.rate_limit_threshold = 2
def _get_headers(self):
return {
"Authorization": f"token {self.tokens[self.current_token_index]}",
"Accept": "application/vnd.github.v3+json"
}
async def _clean_cache(self):
current_time = time.time()
self.cache = {key: value for key, value in self.cache.items() if current_time - value[1] <= self.cache_ttl}
async def _rotate_token(self):
current_time = time.time()
self.current_token_index = (self.current_token_index + 1) % len(self.tokens)
if current_time - self.token_last_used[self.current_token_index] > 3600:
self.current_token_index = 0
self.token_last_used[self.current_token_index] = current_time
async def request(self, method, endpoint, headers = {}, **kwargs) -> aiohttp.ClientResponse:
await self._clean_cache()
cache_key = f"{method}:{endpoint}:{str(kwargs)}"
if cache_key in self.cache and endpoint != "/rate_limit":
cached_response, _ = self.cache[cache_key]
return cached_response
async with aiohttp.ClientSession() as session:
url = f"{self.base_url}{endpoint}"
headers = self._get_headers() | headers
async with session.request(method, url, headers=headers, **kwargs) as response:
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
if remaining <= self.rate_limit_threshold:
await self._rotate_token()
response.raise_for_status()
await response.text()
self.cache[cache_key] = (response, time.time())
return response
async def fetch_github_page(self, url: str, page: int, per_fetch: int, headers: dict = {}, params: dict = {}) -> tuple[list[dict[str, Any]], int]:
resp = await self.request("GET", url, headers=headers, params={"page": page, "per_page": per_fetch} | params)
url = resp.links.get('last', {}).get('url')
if url:
return await resp.json(), int(url.query.get("page"))
else:
return await resp.json(), page
async def search_github_page(self, url: str, page: int, per_fetch: int, headers: dict = {}, params: dict = {}) -> tuple[list[dict[str, Any]], int]:
resp = await self.request("GET", url, headers=headers, params={"page": page, "per_page": per_fetch} | params)
url = resp.links.get('last', {}).get('url')
if url:
return (await resp.json())['items'], page
else:
return (await resp.json())['items'], page
async def get_user(self, username: str):
return await self.request("GET", f"/users/{username}")
async def get_repository(self, owner: str, repo: str):
return await self.request("GET", f"/repos/{owner}/{repo}")
async def search(self, query: str, search_type: str = "repositories"):
return await self.request("GET", f"/search/{search_type}", params={"q": query})
async def get_repository_contents(self, owner: str, repo: str, path: str):
return await self.request("GET", f"/repos/{owner}/{repo}/contents/{path}")
async def get_repository_languages(self, owner: str, repo: str):
return await self.request("GET", f"/repos/{owner}/{repo}/languages")
async def get_gist(self, gist_id: str):
return await self.request("GET", f"/gists/{gist_id}")
async def get_latest_release(self, owner: str, repo: str):
return await self.request("GET", f"/repos/{owner}/{repo}/releases/latest")
async def get_release_by_tag(self, owner: str, repo: str, tag: str):
return await self.request("GET", f"/repos/{owner}/{repo}/releases/tags/{tag}")
async def get_commit(self, owner: str, repo: str, sha: str):
return await self.request("GET", f"/repos/{owner}/{repo}/commits/{sha}")
async def get_pull_request(self, owner: str, repo: str, pr_number: int):
return await self.request("GET", f"/repos/{owner}/{repo}/pulls/{pr_number}")
async def get_issue(self, owner: str, repo: str, issue_number: int):
return await self.request("GET", f"/repos/{owner}/{repo}/issues/{issue_number}")
async def get_rate_limit(self):
return await self.request("GET", "/rate_limit")