diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b6aa6d7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +github_repos.db +github_repos.db-shm +github_repos.db-wal +__pycache__/ +venv/ \ No newline at end of file diff --git a/crawler.py b/crawler.py index ab0b407..f675daa 100755 --- a/crawler.py +++ b/crawler.py @@ -5,17 +5,14 @@ import subprocess import time from datetime import datetime, timedelta, timezone -from typing import Callable, List +from typing import List import requests from db import RepoDB TOKEN = "replace this" -MIN_STARS = 1_000 -DB_PATH = "github_repos.db" -LIVE_DB_PATH = "repos.db" -PM2_APP_NAME = "git_leaderboard" + GRAPHQL_QUERY = """ query($queryString: String!, $cursor: String) { rateLimit { @@ -126,163 +123,128 @@ def execute_query(self, query: str, cursor: str = None): raise Exception("Max retries exceeded.") - -def deploy_site(crawled_db_path: str): - log("Preparing deployment...") - - db = RepoDB(crawled_db_path) - - row = db.conn.execute("SELECT COUNT(*) AS cnt FROM repo_latest").fetchone() - total_repos = int(row["cnt"]) if row else 0 - formatted_total = "{:,}".format(total_repos) - - lang_rows = db.conn.execute("SELECT name FROM language ORDER BY name LIMIT 5000").fetchall() - languages = [str(r["name"]) for r in lang_rows] - - topic_sql = """ - SELECT t.name, COUNT(rtl.repo_id) AS cnt - FROM topic t - JOIN repo_topic_latest rtl ON rtl.topic_id = t.id - GROUP BY t.id - ORDER BY cnt DESC - LIMIT 500 - """ - topic_rows = db.conn.execute(topic_sql).fetchall() - topics = [{"name": str(r["name"]), "count": int(r["cnt"])} for r in topic_rows] - - db.close() - - log("Stopping PM2 service...") - try: - subprocess.run(["pm2", "stop", PM2_APP_NAME], check=False, stdout=subprocess.DEVNULL) - except Exception as e: - log(f"Warning: PM2 stop failed (might not be running): {e}") - - log("Swapping Database...") - if os.path.exists(crawled_db_path): - if os.path.exists(LIVE_DB_PATH): - os.remove(LIVE_DB_PATH) - shutil.copy2(crawled_db_path, LIVE_DB_PATH) - - index_path = "index.html" - if os.path.exists(index_path): - with open(index_path, "r", encoding="utf-8") as f: - html_content = f.read() - - html_content = re.sub(r'(id="totalRepos"[^>]*>).*?(\w+>)', f"\\g<1>{formatted_total}\\g<2>", html_content) - - with open(index_path, "w", encoding="utf-8") as f: - f.write(html_content) - log(f"Updated {index_path} with {formatted_total} repos.") - - app_js_path = "static/app.js" - if os.path.exists(app_js_path): - with open(app_js_path, "r", encoding="utf-8") as f: - js_content = f.read() - - static_data = {"languages": languages, "topics": topics} - injection_code1 = f"const STATIC_DATA = {json.dumps(static_data)};" - - if "const STATIC_DATA =" in js_content: - js_content = re.sub(r"const STATIC_DATA = \{.*?\};", injection_code1, js_content, flags=re.DOTALL) - - with open(app_js_path, "w", encoding="utf-8") as f: - f.write(js_content) - log(f"Updated {app_js_path} with static lists.") - - log("Restarting PM2 service...") - try: - subprocess.run(["pm2", "restart", PM2_APP_NAME], check=False, stdout=subprocess.DEVNULL) - except Exception as e: - log(f"Warning: PM2 restart failed (maybe not running): {e}") - log("Deployment complete.") - - -def crawl(): - gh = GithubGraphQL(TOKEN) - db = RepoDB(DB_PATH) - current_min_stars = MIN_STARS - total_fetched = 0 - - log(f"Starting crawl for repos with >= {MIN_STARS} stars...") - - while True: - search_query = f"stars:>={current_min_stars} sort:stars-asc" - log(f"Querying batch: '{search_query}'") - - cursor = None - batch_repos = [] - has_next_page = True - - # Max 1000 results allowed by GitHub - while has_next_page: - time.sleep(0.1) - data = gh.execute_query(search_query, cursor) - search_data = data["search"] - - nodes = search_data["nodes"] - if not nodes: - break - - batch_repos.extend(nodes) - total_fetched += len(nodes) - - log(f" Fetched {len(nodes)} items. Total: {total_fetched}. Last star count: {nodes[-1]['stargazerCount']}") - - page_info = search_data["pageInfo"] - has_next_page = page_info["hasNextPage"] - cursor = page_info["endCursor"] - - if len(batch_repos) >= 1000: - break - - if not batch_repos: - log("No more results found.") - break - - last_repo_stars = batch_repos[-1]["stargazerCount"] - - if last_repo_stars == current_min_stars: - current_min_stars += 1 - else: - current_min_stars = last_repo_stars - db.upsert_from_github_nodes(batch_repos) - db.close() - deploy_site(DB_PATH) - - -def run_at_hours(func: Callable, hours_list: List[int]): - now = datetime.now(tz=timezone.utc) - current_h = now.hour - - if current_h in hours_list: - minutes_remaining = 0 - else: - sorted_hours = sorted(hours_list) - next_hour = next((h for h in sorted_hours if h > current_h), sorted_hours[0]) - - target = now.replace(hour=next_hour, minute=0, second=0, microsecond=0) - if next_hour <= current_h: - target += timedelta(days=1) - - minutes_remaining = int((target - now).total_seconds() / 60) - - log(f"Scheduler started for hours: {hours_list}") - log(f"Next crawl will start in approximately {minutes_remaining} minutes.") - last_run_hour = -1 - - while True: - current_hour = datetime.now(tz=timezone.utc).hour - - if current_hour in hours_list and current_hour != last_run_hour: - func() - last_run_hour = current_hour - - if current_hour not in hours_list: - last_run_hour = -1 - - time.sleep(30) +class Crawler: + def __init__( + self, + token: str, + min_stars: int = 1_000, + db_path: str = "github_repos.db", + live_db_path: str = "repos.db", + pm2_app_name: str = "git_leaderboard", + ): + self.token = token + self.min_stars = min_stars + self.db_path = db_path + self.live_db_path = live_db_path + self.pm2_app_name = pm2_app_name + + self.gh = GithubGraphQL(self.token) + self.db = RepoDB(self.db_path) + + self.current_min_stars = self.min_stars + self.total_fetched = 0 + # used by upsert logic to avoid duplicate processing in a run + self._processed_repo_ids: set[int] = set() + + def log(self, *args, **kwargs): + """Simple timestamped logger""" + timestamp = datetime.now().strftime("[%d:%H:%S]") + print(timestamp, *args, **kwargs) + + def deploy_site(self): + self.log("Preparing deployment...") + + # reopen a fresh connection so we don't interfere with crawling + db = RepoDB(self.db_path) + + row = db.conn.execute("SELECT COUNT(*) AS cnt FROM repo_latest").fetchone() + total_repos = int(row["cnt"]) if row else 0 + formatted_total = "{:,}".format(total_repos) + + lang_rows = db.conn.execute("SELECT name FROM language ORDER BY name LIMIT 5000").fetchall() + languages = [str(r["name"]) for r in lang_rows] + + topic_sql = """ + SELECT t.name, COUNT(rtl.repo_id) AS cnt + FROM topic t + JOIN repo_topic_latest rtl ON rtl.topic_id = t.id + GROUP BY t.id + ORDER BY cnt DESC + LIMIT 500 + """ + topic_rows = db.conn.execute(topic_sql).fetchall() + topics = [{"name": str(r["name"]), "count": int(r["cnt"])} for r in topic_rows] + + db.close() + + self.log("Stopping PM2 service...") + try: + subprocess.run(["pm2", "stop", self.pm2_app_name], check=False, stdout=subprocess.DEVNULL) + except Exception as e: + self.log(f"Warning: PM2 stop failed (might not be running): {e}") + + self.log("Swapping Database...") + if os.path.exists(self.db_path): + if os.path.exists(self.live_db_path): + os.remove(self.live_db_path) + shutil.copy2(self.db_path, self.live_db_path) + + index_path = "index.html" + if os.path.exists(index_path): + with open(index_path, "r", encoding="utf-8") as f: + html_content = f.read() + + html_content = re.sub(r'(id="totalRepos"[^>]*>).*?(\w+>)', f"\\g<1>{formatted_total}\\g<2>", html_content) + + with open(index_path, "w", encoding="utf-8") as f: + f.write(html_content) + self.log(f"Updated {index_path} with {formatted_total} repos.") + + app_js_path = "static/app.js" + if os.path.exists(app_js_path): + with open(app_js_path, "r", encoding="utf-8") as f: + js_content = f.read() + + static_data = {"languages": languages, "topics": topics} + injection_code1 = f"const STATIC_DATA = {json.dumps(static_data)};" + + if "const STATIC_DATA =" in js_content: + js_content = re.sub(r"const STATIC_DATA = \{.*?\};", injection_code1, js_content, flags=re.DOTALL) + + with open(app_js_path, "w", encoding="utf-8") as f: + f.write(js_content) + self.log(f"Updated {app_js_path} with static lists.") + + self.log("Restarting PM2 service...") + try: + subprocess.run(["pm2", "restart", self.pm2_app_name], check=False, stdout=subprocess.DEVNULL) + except Exception as e: + self.log(f"Warning: PM2 restart failed (maybe not running): {e}") + self.log("Deployment complete.") + + def run_at_hours(self, hours: List[int]): + self.log(f"Starting crawler. Will run at hours: {hours}") + while True: + now = datetime.now() + if now.hour in hours: + self.log("Starting crawl cycle...") + try: + self.crawl_and_update() + self.deploy_site() + except Exception as e: + self.log(f"Error during crawl/deploy: {e}") + self.log("Cycle complete. Sleeping for 1 hour.") + time.sleep(3600) + else: + next_run = min((h for h in hours if h > now.hour), default=hours[0] + 24) + next_run_time = now.replace(hour=next_run % 24, minute=0, second=0, microsecond=0) + if next_run_time <= now: + next_run_time += timedelta(days=1) + sleep_seconds = (next_run_time - now).total_seconds() + self.log(f"Current hour {now.hour} not in target hours. Sleeping for {sleep_seconds/3600:.2f} hours until {next_run_time}.") + time.sleep(sleep_seconds) if __name__ == "__main__": - run_at_hours(crawl, [0, 6, 12, 18]) + crawler = Crawler(TOKEN) + crawler.run_at_hours([0, 6, 12, 18]) \ No newline at end of file diff --git a/db.py b/db.py index 3c9a219..9787b5d 100755 --- a/db.py +++ b/db.py @@ -3,51 +3,6 @@ from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple -def iso_to_unix(ts: Optional[str]) -> Optional[int]: - if not ts: - return None - dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) - return int(dt.timestamp()) - - -def unix_to_iso(ts: Optional[int]) -> Optional[str]: - if ts is None: - return None - return datetime.fromtimestamp(int(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z") - - -def chunks(seq: Sequence[Any], n: int) -> Iterable[Sequence[Any]]: - for i in range(0, len(seq), n): - yield seq[i : i + n] - - -def row_to_obj(row: sqlite3.Row) -> Dict[str, Any]: - topics_concat = row["topicsConcat"] - topics = [] if topics_concat is None else str(topics_concat).split("\x1f") - topics = [t for t in topics if t] - - res = { - "n": row["nameWithOwner"], - "g": None if row["globalRank"] is None else int(row["globalRank"]), - "s": int(row["stargazerCount"]), - "f": int(row["forkCount"]), - "w": int(row["watchersCount"]), - "d": None if row["diskUsage"] is None else int(row["diskUsage"]), - "a": row["description"], - "h": row["homepageUrl"], - "c": unix_to_iso(row["createdAtUnix"]), - # "u": unix_to_iso(row["updatedAtUnix"]), - "p": unix_to_iso(row["pushedAtUnix"]), - "i": bool(int(row["isArchived"])), - "l": row["primaryLanguage"], - "t": topics, - # "x": unix_to_iso(row["fetchedAtUnix"]), - } - if "newStars" in row.keys() and row["newStars"]: - res["ns"] = int(row["newStars"]) - return res - - def select_latest_base_sql(include_global_rank: bool = True, extra_select: str = "") -> str: rank_select = "gr.globalRank AS globalRank," if include_global_rank else "NULL AS globalRank," rank_join = ( @@ -105,6 +60,50 @@ def count_base_sql() -> str: class RepoDB: + @staticmethod + def iso_to_unix(ts: Optional[str]) -> Optional[int]: + if not ts: + return None + dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) + return int(dt.timestamp()) + + @staticmethod + def unix_to_iso(ts: Optional[int]) -> Optional[str]: + if ts is None: + return None + return datetime.fromtimestamp(int(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z") + + @staticmethod + def chunks(seq: Sequence[Any], n: int) -> Iterable[Sequence[Any]]: + for i in range(0, len(seq), n): + yield seq[i : i + n] + + @staticmethod + def row_to_obj(row: sqlite3.Row) -> Dict[str, Any]: + topics_concat = row["topicsConcat"] + topics = [] if topics_concat is None else str(topics_concat).split("\x1f") + topics = [t for t in topics if t] + + res = { + "n": row["nameWithOwner"], + "g": None if row["globalRank"] is None else int(row["globalRank"]), + "s": int(row["stargazerCount"]), + "f": int(row["forkCount"]), + "w": int(row["watchersCount"]), + "d": None if row["diskUsage"] is None else int(row["diskUsage"]), + "a": row["description"], + "h": row["homepageUrl"], + "c": RepoDB.unix_to_iso(row["createdAtUnix"]), + # "u": RepoDB.unix_to_iso(row["updatedAtUnix"]), + "p": RepoDB.unix_to_iso(row["pushedAtUnix"]), + "i": bool(int(row["isArchived"])), + "l": row["primaryLanguage"], + "t": topics, + # "x": RepoDB.unix_to_iso(row["fetchedAtUnix"]), + } + if "newStars" in row.keys() and row["newStars"]: + res["ns"] = int(row["newStars"]) + return res def __init__(self, path: str) -> None: self.conn = sqlite3.connect(path, check_same_thread=False) self.conn.row_factory = sqlite3.Row @@ -252,7 +251,7 @@ def _fetch_repo_ids(self, names: List[str]) -> Dict[str, int]: out: Dict[str, int] = {} if not names: return out - for chunk in chunks(names, 500): + for chunk in self.chunks(names, 500): q = "SELECT id, name_with_owner FROM repo WHERE name_with_owner IN ({})".format(",".join(["?"] * len(chunk))) for row in self.conn.execute(q, tuple(chunk)).fetchall(): out[str(row["name_with_owner"])] = int(row["id"]) @@ -262,7 +261,7 @@ def _fetch_latest_metrics(self, repo_ids: List[int]) -> Dict[int, sqlite3.Row]: out: Dict[int, sqlite3.Row] = {} if not repo_ids: return out - for chunk in chunks(repo_ids, 500): + for chunk in self.chunks(repo_ids, 500): q = "SELECT repo_id, history_start_run_id, stars, forks, watchers, disk_usage FROM repo_latest WHERE repo_id IN ({})".format(",".join(["?"] * len(chunk))) for row in self.conn.execute(q, tuple(chunk)).fetchall(): out[int(row["repo_id"])] = row @@ -301,7 +300,7 @@ def upsert_from_github_nodes(self, nodes: List[Dict[str, Any]]) -> None: repo_id = int(n["databaseId"]) name = n["nameWithOwner"] repo_ids.append(repo_id) - repo_rows.append((repo_id, name, iso_to_unix(n.get("createdAt")), n.get("description"), n.get("homepageUrl"))) + repo_rows.append((repo_id, name, self.iso_to_unix(n.get("createdAt")), n.get("description"), n.get("homepageUrl"))) with self.conn: conflict_params = [(row[1], row[0]) for row in repo_rows] @@ -360,8 +359,8 @@ def upsert_from_github_nodes(self, nodes: List[Dict[str, Any]]) -> None: disk_usage = n.get("diskUsage") disk_usage_i: Optional[int] = None if disk_usage is None else int(disk_usage) - updated_at = iso_to_unix(n.get("updatedAt")) - pushed_at = iso_to_unix(n.get("pushedAt")) + updated_at = self.iso_to_unix(n.get("updatedAt")) + pushed_at = self.iso_to_unix(n.get("pushedAt")) is_archived = 1 if bool(n.get("isArchived")) else 0 pl_name = None @@ -473,7 +472,7 @@ def upsert_from_github_nodes(self, nodes: List[Dict[str, Any]]) -> None: ) if topic_repo_ids_to_refresh: - for chunk in chunks(topic_repo_ids_to_refresh, 500): + for chunk in self.chunks(topic_repo_ids_to_refresh, 500): q = "DELETE FROM repo_topic_latest WHERE repo_id IN ({})".format(",".join(["?"] * len(chunk))) self.conn.execute(q, tuple(chunk)) @@ -492,7 +491,7 @@ def get_repo_latest(self, name_with_owner: str) -> Optional[Dict[str, Any]]: """ ) row = self.conn.execute(q, (name_with_owner,)).fetchone() - return None if row is None else row_to_obj(row) + return None if row is None else self.row_to_obj(row) def _prepare_filter_conditions( self, @@ -591,7 +590,7 @@ def trending_leaderboard( all_params.extend([page_size, int(offset)]) rows = self.conn.execute(sql, tuple(all_params)).fetchall() - return [row_to_obj(r) for r in rows] + return [self.row_to_obj(r) for r in rows] def leaderboard( self, @@ -650,7 +649,7 @@ def leaderboard( params.extend([page_size, int(offset)]) rows = self.conn.execute(sql, tuple(params)).fetchall() - return [row_to_obj(r) for r in rows] + return [self.row_to_obj(r) for r in rows] def get_global_rank(self, name_with_owner: str) -> Optional[int]: sql = """ @@ -693,8 +692,8 @@ def history_segments(self, name_with_owner: str, limit: int = 5000) -> List[Dict for r in rows: out.append( { - "startFetchedAt": unix_to_iso(r["startFetchedAtUnix"]), - "endFetchedAt": unix_to_iso(r["endFetchedAtUnix"]), + "startFetchedAt": self.unix_to_iso(r["startFetchedAtUnix"]), + "endFetchedAt": self.unix_to_iso(r["endFetchedAtUnix"]), "s": int(r["stars"]), "f": int(r["forks"]), "w": int(r["watchers"]), diff --git a/index.html b/index.html index e9afd4e..2d9a6d9 100755 --- a/index.html +++ b/index.html @@ -97,6 +97,6 @@