diff --git a/src/applypilot/apply/launcher.py b/src/applypilot/apply/launcher.py index 341a11a..fe8e276 100644 --- a/src/applypilot/apply/launcher.py +++ b/src/applypilot/apply/launcher.py @@ -27,22 +27,33 @@ from applypilot.database import get_connection from applypilot.apply import chrome, dashboard, prompt as prompt_mod from applypilot.apply.chrome import ( - launch_chrome, cleanup_worker, kill_all_chrome, - reset_worker_dir, cleanup_on_exit, _kill_process_tree, + launch_chrome, + cleanup_worker, + kill_all_chrome, + reset_worker_dir, + cleanup_on_exit, + _kill_process_tree, BASE_CDP_PORT, ) from applypilot.apply.dashboard import ( - init_worker, update_state, add_event, get_state, - render_full, get_totals, + init_worker, + update_state, + add_event, + get_state, + render_full, + get_totals, ) logger = logging.getLogger(__name__) + # Blocked sites loaded from config/sites.yaml def _load_blocked(): from applypilot.config import load_blocked_sites + return load_blocked_sites() + # How often to poll the DB when the queue is empty (seconds) POLL_INTERVAL = config.DEFAULTS["poll_interval"] @@ -63,6 +74,7 @@ def _load_blocked(): # MCP config # --------------------------------------------------------------------------- + def _make_mcp_config(cdp_port: int) -> dict: """Build MCP config dict for a specific CDP port.""" return { @@ -87,8 +99,8 @@ def _make_mcp_config(cdp_port: int) -> dict: # Database operations # --------------------------------------------------------------------------- -def acquire_job(target_url: str | None = None, min_score: int = 7, - worker_id: int = 0) -> dict | None: + +def acquire_job(target_url: str | None = None, min_score: int = 7, worker_id: int = 0) -> dict | None: """Atomically acquire the next job to apply to. Args: @@ -99,13 +111,16 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, Returns: Job dict or None if the queue is empty. """ + import sqlite3 as _sqlite3 + conn = get_connection() try: conn.execute("BEGIN IMMEDIATE") if target_url: like = f"%{target_url.split('?')[0].rstrip('/')}%" - row = conn.execute(""" + row = conn.execute( + """ SELECT url, title, site, application_url, tailored_resume_path, fit_score, location, full_description, cover_letter_path FROM jobs @@ -113,7 +128,9 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, AND tailored_resume_path IS NOT NULL AND apply_status != 'in_progress' LIMIT 1 - """, (target_url, target_url, like, like)).fetchone() + """, + (target_url, target_url, like, like), + ).fetchone() else: blocked_sites, blocked_patterns = _load_blocked() # Build parameterized filters to avoid SQL injection @@ -127,7 +144,8 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, if blocked_patterns: url_clauses = " ".join(f"AND url NOT LIKE ?" for _ in blocked_patterns) params.extend(blocked_patterns) - row = conn.execute(f""" + row = conn.execute( + f""" SELECT url, title, site, application_url, tailored_resume_path, fit_score, location, full_description, cover_letter_path FROM jobs @@ -139,7 +157,9 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, {url_clauses} ORDER BY fit_score DESC, url LIMIT 1 - """, [config.DEFAULTS["max_apply_attempts"]] + params).fetchone() + """, + [config.DEFAULTS["max_apply_attempts"]] + params, + ).fetchone() if not row: conn.rollback() @@ -147,6 +167,7 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, # Skip manual ATS sites (unsolvable CAPTCHAs) from applypilot.config import is_manual_ats + apply_url = row["application_url"] or row["url"] if is_manual_ats(apply_url): conn.execute( @@ -158,41 +179,72 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, return None now = datetime.now(timezone.utc).isoformat() - conn.execute(""" + conn.execute( + """ UPDATE jobs SET apply_status = 'in_progress', agent_id = ?, last_attempted_at = ? WHERE url = ? - """, (f"worker-{worker_id}", now, row["url"])) + """, + (f"worker-{worker_id}", now, row["url"]), + ) conn.commit() return dict(row) + except _sqlite3.OperationalError as e: + conn.rollback() + if "locked" in str(e).lower(): + logger.warning("DB locked in acquire_job, retrying after backoff...") + time.sleep(2) + return acquire_job(target_url, min_score, worker_id) + raise except Exception: conn.rollback() raise -def mark_result(url: str, status: str, error: str | None = None, - permanent: bool = False, duration_ms: int | None = None, - task_id: str | None = None) -> None: +def mark_result( + url: str, + status: str, + error: str | None = None, + permanent: bool = False, + duration_ms: int | None = None, + task_id: str | None = None, +) -> None: """Update a job's apply status in the database.""" conn = get_connection() now = datetime.now(timezone.utc).isoformat() if status == "applied": - conn.execute(""" + conn.execute( + """ UPDATE jobs SET apply_status = 'applied', applied_at = ?, apply_error = NULL, agent_id = NULL, apply_duration_ms = ?, apply_task_id = ? WHERE url = ? - """, (now, duration_ms, task_id, url)) + """, + (now, duration_ms, task_id, url), + ) else: - attempts = 99 if permanent else "COALESCE(apply_attempts, 0) + 1" - conn.execute(f""" - UPDATE jobs SET apply_status = ?, apply_error = ?, - apply_attempts = {attempts}, agent_id = NULL, - apply_duration_ms = ?, apply_task_id = ? - WHERE url = ? - """, (status, error or "unknown", duration_ms, task_id, url)) + if permanent: + conn.execute( + """ + UPDATE jobs SET apply_status = ?, apply_error = ?, + apply_attempts = 99, agent_id = NULL, + apply_duration_ms = ?, apply_task_id = ? + WHERE url = ? + """, + (status, error or "unknown", duration_ms, task_id, url), + ) + else: + conn.execute( + """ + UPDATE jobs SET apply_status = ?, apply_error = ?, + apply_attempts = COALESCE(apply_attempts, 0) + 1, agent_id = NULL, + apply_duration_ms = ?, apply_task_id = ? + WHERE url = ? + """, + (status, error or "unknown", duration_ms, task_id, url), + ) conn.commit() @@ -210,8 +262,8 @@ def release_lock(url: str) -> None: # Utility modes (--gen, --mark-applied, --mark-failed, --reset-failed) # --------------------------------------------------------------------------- -def gen_prompt(target_url: str, min_score: int = 7, - model: str = "sonnet", worker_id: int = 0) -> Path | None: + +def gen_prompt(target_url: str, min_score: int = 7, model: str = "sonnet", worker_id: int = 0) -> Path | None: """Generate a prompt file and print the Claude CLI command for manual debugging. Returns: @@ -258,17 +310,23 @@ def mark_job(url: str, status: str, reason: str | None = None) -> None: conn = get_connection() now = datetime.now(timezone.utc).isoformat() if status == "applied": - conn.execute(""" + conn.execute( + """ UPDATE jobs SET apply_status = 'applied', applied_at = ?, apply_error = NULL, agent_id = NULL WHERE url = ? - """, (now, url)) + """, + (now, url), + ) else: - conn.execute(""" + conn.execute( + """ UPDATE jobs SET apply_status = 'failed', apply_error = ?, apply_attempts = 99, agent_id = NULL WHERE url = ? - """, (reason or "manual", url)) + """, + (reason or "manual", url), + ) conn.commit() @@ -294,8 +352,8 @@ def reset_failed() -> int: # Per-job execution # --------------------------------------------------------------------------- -def run_job(job: dict, port: int, worker_id: int = 0, - model: str = "sonnet", dry_run: bool = False) -> tuple[str, int]: + +def run_job(job: dict, port: int, worker_id: int = 0, model: str = "sonnet", dry_run: bool = False) -> tuple[str, int]: """Spawn a Claude Code session for one job application. Returns: @@ -324,12 +382,16 @@ def run_job(job: dict, port: int, worker_id: int = 0, # Build claude command cmd = [ "claude", - "--model", model, + "--model", + model, "-p", - "--mcp-config", str(mcp_config_path), - "--permission-mode", "bypassPermissions", + "--mcp-config", + str(mcp_config_path), + "--permission-mode", + "bypassPermissions", "--no-session-persistence", - "--disallowedTools", ( + "--disallowedTools", + ( "mcp__gmail__draft_email,mcp__gmail__modify_email," "mcp__gmail__delete_email,mcp__gmail__download_attachment," "mcp__gmail__batch_modify_emails,mcp__gmail__batch_delete_emails," @@ -339,19 +401,33 @@ def run_job(job: dict, port: int, worker_id: int = 0, "mcp__gmail__list_filters,mcp__gmail__get_filter," "mcp__gmail__delete_filter" ), - "--output-format", "stream-json", - "--verbose", "-", + "--output-format", + "stream-json", + "--verbose", + "-", ] env = os.environ.copy() env.pop("CLAUDECODE", None) env.pop("CLAUDE_CODE_ENTRYPOINT", None) + profile = config.load_profile() + pwd = profile.get("personal", {}).get("password", "") + if pwd: + env["APPLYPILOT_SITE_PASSWORD"] = pwd + worker_dir = reset_worker_dir(worker_id) - update_state(worker_id, status="applying", job_title=job["title"], - company=job.get("site", ""), score=job.get("fit_score", 0), - start_time=time.time(), actions=0, last_action="starting") + update_state( + worker_id, + status="applying", + job_title=job["title"], + company=job.get("site", ""), + score=job.get("fit_score", 0), + start_time=time.time(), + actions=0, + last_action="starting", + ) add_event(f"[W{worker_id}] Starting: {job['title'][:40]} @ {job.get('site', '')}") worker_log = config.LOG_DIR / f"worker-{worker_id}.log" @@ -424,9 +500,7 @@ def run_job(job: dict, port: int, worker_id: int = 0, lf.write(f" >> {desc}\n") ws = get_state(worker_id) cur_actions = ws.actions if ws else 0 - update_state(worker_id, - actions=cur_actions + 1, - last_action=desc[:35]) + update_state(worker_id, actions=cur_actions + 1, last_action=desc[:35]) elif msg_type == "result": stats = { "input_tokens": msg.get("usage", {}).get("input_tokens", 0), @@ -463,13 +537,12 @@ def run_job(job: dict, port: int, worker_id: int = 0, update_state(worker_id, total_cost=prev_cost + cost) def _clean_reason(s: str) -> str: - return re.sub(r'[*`"]+$', '', s).strip() + return re.sub(r'[*`"]+$', "", s).strip() for result_status in ["APPLIED", "EXPIRED", "CAPTCHA", "LOGIN_ISSUE"]: if f"RESULT:{result_status}" in output: add_event(f"[W{worker_id}] {result_status} ({elapsed}s): {job['title'][:30]}") - update_state(worker_id, status=result_status.lower(), - last_action=f"{result_status} ({elapsed}s)") + update_state(worker_id, status=result_status.lower(), last_action=f"{result_status} ({elapsed}s)") return result_status.lower(), duration_ms if "RESULT:FAILED" in output: @@ -477,19 +550,17 @@ def _clean_reason(s: str) -> str: if "RESULT:FAILED" in out_line: reason = ( out_line.split("RESULT:FAILED:")[-1].strip() - if ":" in out_line[out_line.index("FAILED") + 6:] + if ":" in out_line[out_line.index("FAILED") + 6 :] else "unknown" ) reason = _clean_reason(reason) PROMOTE_TO_STATUS = {"captcha", "expired", "login_issue"} if reason in PROMOTE_TO_STATUS: add_event(f"[W{worker_id}] {reason.upper()} ({elapsed}s): {job['title'][:30]}") - update_state(worker_id, status=reason, - last_action=f"{reason.upper()} ({elapsed}s)") + update_state(worker_id, status=reason, last_action=f"{reason.upper()} ({elapsed}s)") return reason, duration_ms add_event(f"[W{worker_id}] FAILED ({elapsed}s): {reason[:30]}") - update_state(worker_id, status="failed", - last_action=f"FAILED: {reason[:25]}") + update_state(worker_id, status="failed", last_action=f"FAILED: {reason[:25]}") return f"failed:{reason}", duration_ms return "failed:unknown", duration_ms @@ -520,12 +591,20 @@ def _clean_reason(s: str) -> str: # --------------------------------------------------------------------------- PERMANENT_FAILURES: set[str] = { - "expired", "captcha", "login_issue", - "not_eligible_location", "not_eligible_salary", - "already_applied", "account_required", - "not_a_job_application", "unsafe_permissions", - "unsafe_verification", "sso_required", - "site_blocked", "cloudflare_blocked", "blocked_by_cloudflare", + "expired", + "captcha", + "login_issue", + "not_eligible_location", + "not_eligible_salary", + "already_applied", + "account_required", + "not_a_job_application", + "unsafe_permissions", + "unsafe_verification", + "sso_required", + "site_blocked", + "cloudflare_blocked", + "blocked_by_cloudflare", } PERMANENT_PREFIXES: tuple[str, ...] = ("site_blocked", "cloudflare", "blocked_by") @@ -545,10 +624,16 @@ def _is_permanent_failure(result: str) -> bool: # Worker loop # --------------------------------------------------------------------------- -def worker_loop(worker_id: int = 0, limit: int = 1, - target_url: str | None = None, - min_score: int = 7, headless: bool = False, - model: str = "sonnet", dry_run: bool = False) -> tuple[int, int]: + +def worker_loop( + worker_id: int = 0, + limit: int = 1, + target_url: str | None = None, + min_score: int = 7, + headless: bool = False, + model: str = "sonnet", + dry_run: bool = False, +) -> tuple[int, int]: """Run jobs sequentially until limit is reached or queue is empty. Args: @@ -574,19 +659,16 @@ def worker_loop(worker_id: int = 0, limit: int = 1, if not continuous and jobs_done >= limit: break - update_state(worker_id, status="idle", job_title="", company="", - last_action="waiting for job", actions=0) + update_state(worker_id, status="idle", job_title="", company="", last_action="waiting for job", actions=0) - job = acquire_job(target_url=target_url, min_score=min_score, - worker_id=worker_id) + job = acquire_job(target_url=target_url, min_score=min_score, worker_id=worker_id) if not job: if not continuous: add_event(f"[W{worker_id}] Queue empty") update_state(worker_id, status="done", last_action="queue empty") break empty_polls += 1 - update_state(worker_id, status="idle", - last_action=f"polling ({empty_polls})") + update_state(worker_id, status="idle", last_action=f"polling ({empty_polls})") if empty_polls == 1: add_event(f"[W{worker_id}] Queue empty, polling every {POLL_INTERVAL}s...") # Use Event.wait for interruptible sleep @@ -601,8 +683,7 @@ def worker_loop(worker_id: int = 0, limit: int = 1, add_event(f"[W{worker_id}] Launching Chrome...") chrome_proc = launch_chrome(worker_id, port=port, headless=headless) - result, duration_ms = run_job(job, port=port, worker_id=worker_id, - model=model, dry_run=dry_run) + result, duration_ms = run_job(job, port=port, worker_id=worker_id, model=model, dry_run=dry_run) if result == "skipped": release_lock(job["url"]) @@ -611,16 +692,14 @@ def worker_loop(worker_id: int = 0, limit: int = 1, elif result == "applied": mark_result(job["url"], "applied", duration_ms=duration_ms) applied += 1 - update_state(worker_id, jobs_applied=applied, - jobs_done=applied + failed) + update_state(worker_id, jobs_applied=applied, jobs_done=applied + failed) else: reason = result.split(":", 1)[-1] if ":" in result else result - mark_result(job["url"], "failed", reason, - permanent=_is_permanent_failure(result), - duration_ms=duration_ms) + mark_result( + job["url"], "failed", reason, permanent=_is_permanent_failure(result), duration_ms=duration_ms + ) failed += 1 - update_state(worker_id, jobs_failed=failed, - jobs_done=applied + failed) + update_state(worker_id, jobs_failed=failed, jobs_done=applied + failed) except KeyboardInterrupt: release_lock(job["url"]) @@ -650,10 +729,18 @@ def worker_loop(worker_id: int = 0, limit: int = 1, # Main entry point (called from cli.py) # --------------------------------------------------------------------------- -def main(limit: int = 1, target_url: str | None = None, - min_score: int = 7, headless: bool = False, model: str = "sonnet", - dry_run: bool = False, continuous: bool = False, - poll_interval: int = 60, workers: int = 1) -> None: + +def main( + limit: int = 1, + target_url: str | None = None, + min_score: int = 7, + headless: bool = False, + model: str = "sonnet", + dry_run: bool = False, + continuous: bool = False, + poll_interval: int = 60, + workers: int = 1, +) -> None: """Launch the apply pipeline. Args: @@ -743,13 +830,11 @@ def _refresh(): if effective_limit: base = effective_limit // workers extra = effective_limit % workers - limits = [base + (1 if i < extra else 0) - for i in range(workers)] + limits = [base + (1 if i < extra else 0) for i in range(workers)] else: limits = [0] * workers # continuous mode - with ThreadPoolExecutor(max_workers=workers, - thread_name_prefix="apply-worker") as executor: + with ThreadPoolExecutor(max_workers=workers, thread_name_prefix="apply-worker") as executor: futures = { executor.submit( worker_loop, @@ -781,10 +866,7 @@ def _refresh(): live.update(render_full()) totals = get_totals() - console.print( - f"\n[bold]Done: {total_applied} applied, {total_failed} failed " - f"(${totals['cost']:.3f})[/bold]" - ) + console.print(f"\n[bold]Done: {total_applied} applied, {total_failed} failed (${totals['cost']:.3f})[/bold]") console.print(f"Logs: {config.LOG_DIR}") except KeyboardInterrupt: diff --git a/src/applypilot/apply/prompt.py b/src/applypilot/apply/prompt.py index 37c3790..684b2af 100644 --- a/src/applypilot/apply/prompt.py +++ b/src/applypilot/apply/prompt.py @@ -75,13 +75,15 @@ def _build_profile_summary(profile: dict) -> str: lines.append(f"Available: {avail.get('earliest_start_date', 'Immediately')}") # Standard responses - lines.extend([ - "Age 18+: Yes", - "Background Check: Yes", - "Felony: No", - "Previously Worked Here: No", - "How Heard: Online Job Board", - ]) + lines.extend( + [ + "Age 18+: Yes", + "Background Check: Yes", + "Felony: No", + "Previously Worked Here: No", + "How Heard: Online Job Board", + ] + ) # EEO lines.append(f"Gender: {eeo.get('gender', 'Decline to self-identify')}") @@ -174,7 +176,7 @@ def _build_screening_section(profile: dict) -> str: return f"""== SCREENING QUESTIONS (be strategic) == Hard facts -> answer truthfully from the profile. No guessing. This includes: - Location/relocation: lives in {city}, cannot relocate - - Work authorization: {work_auth.get('legally_authorized_to_work', 'see profile')} + - Work authorization: {work_auth.get("legally_authorized_to_work", "see profile")} - Citizenship, clearance, licenses, certifications: answer from profile only - Criminal/background: answer from profile only @@ -204,9 +206,11 @@ def _build_hard_rules(profile: dict) -> str: if permit_type: work_auth_rule = f"Work auth: {permit_type}. Sponsorship needed: {sponsorship}." - name_rule = f'Name: Legal name = {full_name}.' + name_rule = f"Name: Legal name = {full_name}." if preferred_name and preferred_name != full_name.split()[0]: - name_rule += f' Preferred name = {preferred_name}. Use "{display_name}" unless a field specifically says "legal name".' + name_rule += ( + f' Preferred name = {preferred_name}. Use "{display_name}" unless a field specifically says "legal name".' + ) return f"""== HARD RULES (never break these) == 1. Never lie about: citizenship, work authorization, criminal history, education credentials, security clearance, licenses. @@ -225,7 +229,7 @@ def _build_captcha_section() -> str: return f"""== CAPTCHA == You solve CAPTCHAs via the CapSolver REST API. No browser extension. You control the entire flow. -API key: {capsolver_key or 'NOT CONFIGURED — skip to MANUAL FALLBACK for all CAPTCHAs'} +API key: $CAPSOLVER_API_KEY env var ({"configured" if capsolver_key else "NOT CONFIGURED — skip to MANUAL FALLBACK for all CAPTCHAs"}) API base: https://api.capsolver.com CRITICAL RULE: When ANY CAPTCHA appears (hCaptcha, reCAPTCHA, Turnstile -- regardless of what it looks like visually), you MUST: @@ -300,22 +304,8 @@ def _build_captcha_section() -> str: --- CAPTCHA SOLVE --- Three steps: createTask -> poll -> inject. Do each as a separate browser_evaluate call. -STEP 1 -- CREATE TASK (copy this exactly, fill in the 3 placeholders): -browser_evaluate function: async () => {{{{ - const r = await fetch('https://api.capsolver.com/createTask', {{{{ - method: 'POST', - headers: {{{{'Content-Type': 'application/json'}}}}, - body: JSON.stringify({{{{ - clientKey: '{capsolver_key}', - task: {{{{ - type: 'TASK_TYPE', - websiteURL: 'PAGE_URL', - websiteKey: 'SITE_KEY' - }}}} - }}}}) - }}}}); - return await r.json(); -}}}} +STEP 1 -- CREATE TASK (use Bash curl, NOT browser_evaluate — keeps API key out of page context): +Bash command: curl -s -X POST https://api.capsolver.com/createTask -H 'Content-Type: application/json' -d '{{"clientKey":"'$CAPSOLVER_API_KEY'","task":{{"type":"TASK_TYPE","websiteURL":"PAGE_URL","websiteKey":"SITE_KEY"}}}}' TASK_TYPE values (use EXACTLY these strings): hcaptcha -> HCaptchaTaskProxyLess @@ -332,18 +322,8 @@ def _build_captcha_section() -> str: If errorId > 0 -> CAPTCHA SOLVE failed. Go to MANUAL FALLBACK. STEP 2 -- POLL (replace TASK_ID with the taskId from step 1): -Loop: browser_wait_for time: 3, then run: -browser_evaluate function: async () => {{{{ - const r = await fetch('https://api.capsolver.com/getTaskResult', {{{{ - method: 'POST', - headers: {{{{'Content-Type': 'application/json'}}}}, - body: JSON.stringify({{{{ - clientKey: '{capsolver_key}', - taskId: 'TASK_ID' - }}}}) - }}}}); - return await r.json(); -}}}} +Loop: wait 3 seconds, then run: +Bash command: curl -s -X POST https://api.capsolver.com/getTaskResult -H 'Content-Type: application/json' -d '{{"clientKey":"'$CAPSOLVER_API_KEY'","taskId":"TASK_ID"}}' - status "processing" -> wait 3s, poll again. Max 10 polls (30s). - status "ready" -> extract token: @@ -417,9 +397,7 @@ def _build_captcha_section() -> str: 4. All else fails -> Output RESULT:CAPTCHA.""" -def build_prompt(job: dict, tailored_resume: str, - cover_letter: str | None = None, - dry_run: bool = False) -> str: +def build_prompt(job: dict, tailored_resume: str, cover_letter: str | None = None, dry_run: bool = False) -> str: """Build the full instruction prompt for the apply agent. Loads the user profile and search config internally. All personal data @@ -500,6 +478,7 @@ def build_prompt(job: dict, tailored_resume: str, # SSO domains the agent cannot sign into (loaded from config/sites.yaml) from applypilot.config import load_blocked_sso + blocked_sso = load_blocked_sso() # Preferred display name @@ -516,10 +495,10 @@ def build_prompt(job: dict, tailored_resume: str, prompt = f"""You are an autonomous job application agent. Your ONE mission: get this candidate an interview. You have all the information and tools. Think strategically. Act decisively. Submit the application. == JOB == -URL: {job.get('application_url') or job['url']} -Title: {job['title']} -Company: {job.get('site', 'Unknown')} -Fit Score: {job.get('fit_score', 'N/A')}/10 +URL: {job.get("application_url") or job["url"]} +Title: {job["title"]} +Company: {job.get("site", "Unknown")} +Fit Score: {job.get("fit_score", "N/A")}/10 == FILES == Resume PDF (upload this): {pdf_path} @@ -562,13 +541,13 @@ def build_prompt(job: dict, tailored_resume: str, 2. browser_snapshot to read the page. Then run CAPTCHA DETECT (see CAPTCHA section). If a CAPTCHA is found, solve it before continuing. 3. LOCATION CHECK. Read the page for location info. If not eligible, output RESULT and stop. 4. Find and click the Apply button. If email-only (page says "email resume to X"): - - send_email with subject "Application for {job['title']} -- {display_name}", body = 2-3 sentence pitch + contact info, attach resume PDF: ["{pdf_path}"] + - send_email with subject "Application for {job["title"]} -- {display_name}", body = 2-3 sentence pitch + contact info, attach resume PDF: ["{pdf_path}"] - Output RESULT:APPLIED. Done. After clicking Apply: browser_snapshot. Run CAPTCHA DETECT -- many sites trigger CAPTCHAs right after the Apply click. If found, solve before continuing. 5. Login wall? - 5a. FIRST: check the URL. If you landed on {', '.join(blocked_sso)}, or any SSO/OAuth page -> STOP. Output RESULT:FAILED:sso_required. Do NOT try to sign in to Google/Microsoft/SSO. + 5a. FIRST: check the URL. If you landed on {", ".join(blocked_sso)}, or any SSO/OAuth page -> STOP. Output RESULT:FAILED:sso_required. Do NOT try to sign in to Google/Microsoft/SSO. 5b. Check for popups. Run browser_tabs action "list". If a new tab/window appeared (login popup), switch to it with browser_tabs action "select". Check the URL there too -- if it's SSO -> RESULT:FAILED:sso_required. - 5c. Regular login form (employer's own site)? Try sign in: {personal['email']} / {personal.get('password', '')} + 5c. Regular login form (employer's own site)? Try sign in: {personal["email"]}. For password, run: echo $APPLYPILOT_SITE_PASSWORD 5d. After clicking Login/Sign-in: run CAPTCHA DETECT. Login pages frequently have invisible CAPTCHAs that silently block form submissions. If found, solve it then retry login. 5e. Sign in failed? Try sign up with same email and password. 5f. Need email verification? Use search_emails + read_email to get the code. @@ -608,7 +587,7 @@ def build_prompt(job: dict, tailored_resume: str, - Dropdown won't fill? browser_click to open it, then browser_click the option. - Checkbox won't check via fill_form? Use browser_click on it instead. Snapshot to verify. - Phone field with country prefix: just type digits {phone_digits} -- Date fields: {datetime.now().strftime('%m/%d/%Y')} +- Date fields: {datetime.now().strftime("%m/%d/%Y")} - Validation errors after submit? Take BOTH snapshot AND screenshot. Snapshot shows text errors, screenshot shows red-highlighted fields. Fix all, retry. - Honeypot fields (hidden, "leave blank"): skip them. - Format-sensitive fields: read the placeholder text, match it exactly. diff --git a/src/applypilot/cli.py b/src/applypilot/cli.py index 6c8be91..61c753f 100644 --- a/src/applypilot/cli.py +++ b/src/applypilot/cli.py @@ -33,6 +33,7 @@ # Helpers # --------------------------------------------------------------------------- + def _bootstrap() -> None: """Common setup: load env, create dirs, init DB.""" from applypilot.config import load_env, ensure_dirs @@ -53,10 +54,13 @@ def _version_callback(value: bool) -> None: # Commands # --------------------------------------------------------------------------- + @app.callback() def main( version: bool = typer.Option( - False, "--version", "-V", + False, + "--version", + "-V", help="Show version and exit.", callback=_version_callback, is_eager=True, @@ -77,11 +81,7 @@ def init() -> None: def run( stages: Optional[list[str]] = typer.Argument( None, - help=( - "Pipeline stages to run. " - f"Valid: {', '.join(VALID_STAGES)}, all. " - "Defaults to 'all' if omitted." - ), + help=(f"Pipeline stages to run. Valid: {', '.join(VALID_STAGES)}, all. Defaults to 'all' if omitted."), ), min_score: int = typer.Option(7, "--min-score", help="Minimum fit score for tailor/cover stages."), workers: int = typer.Option(1, "--workers", "-w", help="Parallel threads for discovery/enrichment stages."), @@ -108,25 +108,20 @@ def run( # Validate stage names for s in stage_list: if s != "all" and s not in VALID_STAGES: - console.print( - f"[red]Unknown stage:[/red] '{s}'. " - f"Valid stages: {', '.join(VALID_STAGES)}, all" - ) + console.print(f"[red]Unknown stage:[/red] '{s}'. Valid stages: {', '.join(VALID_STAGES)}, all") raise typer.Exit(code=1) # Gate AI stages behind Tier 2 llm_stages = {"score", "tailor", "cover"} if any(s in stage_list for s in llm_stages) or "all" in stage_list: from applypilot.config import check_tier + check_tier(2, "AI scoring/tailoring") # Validate the --validation flag value valid_modes = ("strict", "normal", "lenient") if validation not in valid_modes: - console.print( - f"[red]Invalid --validation value:[/red] '{validation}'. " - f"Choose from: {', '.join(valid_modes)}" - ) + console.print(f"[red]Invalid --validation value:[/red] '{validation}'. Choose from: {', '.join(valid_modes)}") raise typer.Exit(code=1) result = run_pipeline( @@ -154,7 +149,9 @@ def apply( url: Optional[str] = typer.Option(None, "--url", help="Apply to a specific job URL."), gen: bool = typer.Option(False, "--gen", help="Generate prompt file for manual debugging instead of running."), mark_applied: Optional[str] = typer.Option(None, "--mark-applied", help="Manually mark a job URL as applied."), - mark_failed: Optional[str] = typer.Option(None, "--mark-failed", help="Manually mark a job URL as failed (provide URL)."), + mark_failed: Optional[str] = typer.Option( + None, "--mark-failed", help="Manually mark a job URL as failed (provide URL)." + ), fail_reason: Optional[str] = typer.Option(None, "--fail-reason", help="Reason for --mark-failed."), reset_failed: bool = typer.Option(False, "--reset-failed", help="Reset all failed jobs for retry."), ) -> None: @@ -168,33 +165,37 @@ def apply( if mark_applied: from applypilot.apply.launcher import mark_job + mark_job(mark_applied, "applied") console.print(f"[green]Marked as applied:[/green] {mark_applied}") return if mark_failed: from applypilot.apply.launcher import mark_job + mark_job(mark_failed, "failed", reason=fail_reason) console.print(f"[yellow]Marked as failed:[/yellow] {mark_failed} ({fail_reason or 'manual'})") return if reset_failed: from applypilot.apply.launcher import reset_failed as do_reset + count = do_reset() console.print(f"[green]Reset {count} failed job(s) for retry.[/green]") return # --- Full apply mode --- - # Check 1: Tier 3 required (Claude Code CLI + Chrome) + console.print( + "[yellow]Security: Auto-apply runs with --permission-mode bypassPermissions. " + "Review generated prompts before use.[/yellow]" + ) + check_tier(3, "auto-apply") # Check 2: Profile exists if not _profile_path.exists(): - console.print( - "[red]Profile not found.[/red]\n" - "Run [bold]applypilot init[/bold] to create your profile first." - ) + console.print("[red]Profile not found.[/red]\nRun [bold]applypilot init[/bold] to create your profile first.") raise typer.Exit(code=1) # Check 3: Tailored resumes exist (skip for --gen with --url) @@ -212,6 +213,7 @@ def apply( if gen: from applypilot.apply.launcher import gen_prompt, BASE_CDP_PORT + target = url or "" if not target: console.print("[red]--gen requires --url to specify which job.[/red]") @@ -224,9 +226,7 @@ def apply( console.print(f"[green]Wrote prompt to:[/green] {prompt_file}") console.print(f"\n[bold]Run manually:[/bold]") console.print( - f" claude --model {model} -p " - f"--mcp-config {mcp_path} " - f"--permission-mode bypassPermissions < {prompt_file}" + f" claude --model {model} -p --mcp-config {mcp_path} --permission-mode bypassPermissions < {prompt_file}" ) return @@ -337,8 +337,13 @@ def doctor() -> None: """Check your setup and diagnose missing requirements.""" import shutil from applypilot.config import ( - load_env, PROFILE_PATH, RESUME_PATH, RESUME_PDF_PATH, - SEARCH_CONFIG_PATH, ENV_PATH, get_chrome_path, + load_env, + PROFILE_PATH, + RESUME_PATH, + RESUME_PDF_PATH, + SEARCH_CONFIG_PATH, + ENV_PATH, + get_chrome_path, ) load_env() @@ -373,13 +378,20 @@ def doctor() -> None: # jobspy (discovery dep installed separately) try: import jobspy # noqa: F401 + results.append(("python-jobspy", ok_mark, "Job board scraping available")) except ImportError: - results.append(("python-jobspy", warn_mark, - "pip install --no-deps python-jobspy && pip install pydantic tls-client requests markdownify regex")) + results.append( + ( + "python-jobspy", + warn_mark, + "pip install --no-deps python-jobspy && pip install pydantic tls-client requests markdownify regex", + ) + ) # --- Tier 2 checks --- import os + has_gemini = bool(os.environ.get("GEMINI_API_KEY")) has_openai = bool(os.environ.get("OPENAI_API_KEY")) has_local = bool(os.environ.get("LLM_URL")) @@ -392,8 +404,7 @@ def doctor() -> None: elif has_local: results.append(("LLM API key", ok_mark, f"Local: {os.environ.get('LLM_URL')}")) else: - results.append(("LLM API key", fail_mark, - "Set GEMINI_API_KEY in ~/.applypilot/.env (run 'applypilot init')")) + results.append(("LLM API key", fail_mark, "Set GEMINI_API_KEY in ~/.applypilot/.env (run 'applypilot init')")) # --- Tier 3 checks --- # Claude Code CLI @@ -401,32 +412,32 @@ def doctor() -> None: if claude_bin: results.append(("Claude Code CLI", ok_mark, claude_bin)) else: - results.append(("Claude Code CLI", fail_mark, - "Install from https://claude.ai/code (needed for auto-apply)")) + results.append(("Claude Code CLI", fail_mark, "Install from https://claude.ai/code (needed for auto-apply)")) # Chrome try: chrome_path = get_chrome_path() results.append(("Chrome/Chromium", ok_mark, chrome_path)) except FileNotFoundError: - results.append(("Chrome/Chromium", fail_mark, - "Install Chrome or set CHROME_PATH env var (needed for auto-apply)")) + results.append( + ("Chrome/Chromium", fail_mark, "Install Chrome or set CHROME_PATH env var (needed for auto-apply)") + ) # Node.js / npx (for Playwright MCP) npx_bin = shutil.which("npx") if npx_bin: results.append(("Node.js (npx)", ok_mark, npx_bin)) else: - results.append(("Node.js (npx)", fail_mark, - "Install Node.js 18+ from nodejs.org (needed for auto-apply)")) + results.append(("Node.js (npx)", fail_mark, "Install Node.js 18+ from nodejs.org (needed for auto-apply)")) # CapSolver (optional) capsolver = os.environ.get("CAPSOLVER_API_KEY") if capsolver: results.append(("CapSolver API key", ok_mark, "CAPTCHA solving enabled")) else: - results.append(("CapSolver API key", "[dim]optional[/dim]", - "Set CAPSOLVER_API_KEY in .env for CAPTCHA solving")) + results.append( + ("CapSolver API key", "[dim]optional[/dim]", "Set CAPSOLVER_API_KEY in .env for CAPTCHA solving") + ) # --- Render results --- console.print() @@ -441,6 +452,7 @@ def doctor() -> None: # Tier summary from applypilot.config import get_tier, TIER_LABELS + tier = get_tier() console.print(f"[bold]Current tier: Tier {tier} — {TIER_LABELS[tier]}[/bold]") diff --git a/src/applypilot/config.py b/src/applypilot/config.py index 8c39780..452eb53 100644 --- a/src/applypilot/config.py +++ b/src/applypilot/config.py @@ -44,7 +44,8 @@ def get_chrome_path() -> str: if system == "Windows": candidates = [ Path(os.environ.get("PROGRAMFILES", r"C:\Program Files")) / "Google/Chrome/Application/chrome.exe", - Path(os.environ.get("PROGRAMFILES(X86)", r"C:\Program Files (x86)")) / "Google/Chrome/Application/chrome.exe", + Path(os.environ.get("PROGRAMFILES(X86)", r"C:\Program Files (x86)")) + / "Google/Chrome/Application/chrome.exe", Path(os.environ.get("LOCALAPPDATA", "")) / "Google/Chrome/Application/chrome.exe", ] elif system == "Darwin": @@ -69,9 +70,7 @@ def get_chrome_path() -> str: if found: return found - raise FileNotFoundError( - "Chrome/Chromium not found. Install Chrome or set CHROME_PATH environment variable." - ) + raise FileNotFoundError("Chrome/Chromium not found. Install Chrome or set CHROME_PATH environment variable.") def get_chrome_user_data() -> Path: @@ -94,28 +93,29 @@ def ensure_dirs(): def load_profile() -> dict: """Load user profile from ~/.applypilot/profile.json.""" import json + if not PROFILE_PATH.exists(): - raise FileNotFoundError( - f"Profile not found at {PROFILE_PATH}. Run `applypilot init` first." - ) + raise FileNotFoundError(f"Profile not found at {PROFILE_PATH}. Run `applypilot init` first.") return json.loads(PROFILE_PATH.read_text(encoding="utf-8")) def load_search_config() -> dict: """Load search configuration from ~/.applypilot/searches.yaml.""" import yaml + if not SEARCH_CONFIG_PATH.exists(): # Fall back to package-shipped example example = CONFIG_DIR / "searches.example.yaml" if example.exists(): - return yaml.safe_load(example.read_text(encoding="utf-8")) + return yaml.safe_load(example.read_text(encoding="utf-8")) or {} return {} - return yaml.safe_load(SEARCH_CONFIG_PATH.read_text(encoding="utf-8")) + return yaml.safe_load(SEARCH_CONFIG_PATH.read_text(encoding="utf-8")) or {} def load_sites_config() -> dict: """Load sites.yaml configuration (sites list, manual_ats, blocked, etc.).""" import yaml + path = CONFIG_DIR / "sites.yaml" if not path.exists(): return {} @@ -174,6 +174,7 @@ def load_base_urls() -> dict[str, str | None]: def load_env(): """Load environment variables from ~/.applypilot/.env if it exists.""" from dotenv import load_dotenv + if ENV_PATH.exists(): load_dotenv(ENV_PATH) # Also try CWD .env as fallback @@ -235,6 +236,7 @@ def check_tier(required: int, feature: str) -> None: return from rich.console import Console + _console = Console(stderr=True) missing: list[str] = [] diff --git a/src/applypilot/database.py b/src/applypilot/database.py index a1779c0..7fd03d6 100644 --- a/src/applypilot/database.py +++ b/src/applypilot/database.py @@ -31,7 +31,7 @@ def get_connection(db_path: Path | str | None = None) -> sqlite3.Connection: """ path = str(db_path or DB_PATH) - if not hasattr(_local, 'connections'): + if not hasattr(_local, "connections"): _local.connections = {} conn = _local.connections.get(path) @@ -53,12 +53,27 @@ def get_connection(db_path: Path | str | None = None) -> sqlite3.Connection: def close_connection(db_path: Path | str | None = None) -> None: """Close the cached connection for the current thread.""" path = str(db_path or DB_PATH) - if hasattr(_local, 'connections'): + if hasattr(_local, "connections"): conn = _local.connections.pop(path, None) if conn is not None: conn.close() +def close_all_connections() -> None: + """Close all cached connections for the current thread. + + Call this in thread cleanup (e.g. ThreadPoolExecutor workers) to ensure + SQLite connections are properly released. + """ + if hasattr(_local, "connections"): + for _path, conn in list(_local.connections.items()): + try: + conn.close() + except Exception: + pass + _local.connections.clear() + + def init_db(db_path: Path | str | None = None) -> sqlite3.Connection: """Create the full jobs table with all columns from every pipeline stage. @@ -243,32 +258,23 @@ def get_stats(conn: sqlite3.Connection | None = None) -> dict: stats["total"] = conn.execute("SELECT COUNT(*) FROM jobs").fetchone()[0] # By site breakdown - rows = conn.execute( - "SELECT site, COUNT(*) as cnt FROM jobs GROUP BY site ORDER BY cnt DESC" - ).fetchall() + rows = conn.execute("SELECT site, COUNT(*) as cnt FROM jobs GROUP BY site ORDER BY cnt DESC").fetchall() stats["by_site"] = [(row[0], row[1]) for row in rows] # Enrichment stage - stats["pending_detail"] = conn.execute( - "SELECT COUNT(*) FROM jobs WHERE detail_scraped_at IS NULL" - ).fetchone()[0] + stats["pending_detail"] = conn.execute("SELECT COUNT(*) FROM jobs WHERE detail_scraped_at IS NULL").fetchone()[0] - stats["with_description"] = conn.execute( - "SELECT COUNT(*) FROM jobs WHERE full_description IS NOT NULL" - ).fetchone()[0] + stats["with_description"] = conn.execute("SELECT COUNT(*) FROM jobs WHERE full_description IS NOT NULL").fetchone()[ + 0 + ] - stats["detail_errors"] = conn.execute( - "SELECT COUNT(*) FROM jobs WHERE detail_error IS NOT NULL" - ).fetchone()[0] + stats["detail_errors"] = conn.execute("SELECT COUNT(*) FROM jobs WHERE detail_error IS NOT NULL").fetchone()[0] # Scoring stage - stats["scored"] = conn.execute( - "SELECT COUNT(*) FROM jobs WHERE fit_score IS NOT NULL" - ).fetchone()[0] + stats["scored"] = conn.execute("SELECT COUNT(*) FROM jobs WHERE fit_score IS NOT NULL").fetchone()[0] stats["unscored"] = conn.execute( - "SELECT COUNT(*) FROM jobs " - "WHERE full_description IS NOT NULL AND fit_score IS NULL" + "SELECT COUNT(*) FROM jobs WHERE full_description IS NOT NULL AND fit_score IS NULL" ).fetchone()[0] # Score distribution @@ -280,9 +286,7 @@ def get_stats(conn: sqlite3.Connection | None = None) -> dict: stats["score_distribution"] = [(row[0], row[1]) for row in dist_rows] # Tailoring stage - stats["tailored"] = conn.execute( - "SELECT COUNT(*) FROM jobs WHERE tailored_resume_path IS NOT NULL" - ).fetchone()[0] + stats["tailored"] = conn.execute("SELECT COUNT(*) FROM jobs WHERE tailored_resume_path IS NOT NULL").fetchone()[0] stats["untailored_eligible"] = conn.execute( "SELECT COUNT(*) FROM jobs " @@ -291,9 +295,7 @@ def get_stats(conn: sqlite3.Connection | None = None) -> dict: ).fetchone()[0] stats["tailor_exhausted"] = conn.execute( - "SELECT COUNT(*) FROM jobs " - "WHERE COALESCE(tailor_attempts, 0) >= 5 " - "AND tailored_resume_path IS NULL" + "SELECT COUNT(*) FROM jobs WHERE COALESCE(tailor_attempts, 0) >= 5 AND tailored_resume_path IS NULL" ).fetchone()[0] # Cover letter stage @@ -308,13 +310,9 @@ def get_stats(conn: sqlite3.Connection | None = None) -> dict: ).fetchone()[0] # Application stage - stats["applied"] = conn.execute( - "SELECT COUNT(*) FROM jobs WHERE applied_at IS NOT NULL" - ).fetchone()[0] + stats["applied"] = conn.execute("SELECT COUNT(*) FROM jobs WHERE applied_at IS NOT NULL").fetchone()[0] - stats["apply_errors"] = conn.execute( - "SELECT COUNT(*) FROM jobs WHERE apply_error IS NOT NULL" - ).fetchone()[0] + stats["apply_errors"] = conn.execute("SELECT COUNT(*) FROM jobs WHERE apply_error IS NOT NULL").fetchone()[0] stats["ready_to_apply"] = conn.execute( "SELECT COUNT(*) FROM jobs " @@ -326,8 +324,7 @@ def get_stats(conn: sqlite3.Connection | None = None) -> dict: return stats -def store_jobs(conn: sqlite3.Connection, jobs: list[dict], - site: str, strategy: str) -> tuple[int, int]: +def store_jobs(conn: sqlite3.Connection, jobs: list[dict], site: str, strategy: str) -> tuple[int, int]: """Store discovered jobs, skipping duplicates by URL. Args: @@ -343,7 +340,7 @@ def store_jobs(conn: sqlite3.Connection, jobs: list[dict], new = 0 existing = 0 - for job in jobs: + for i, job in enumerate(jobs): url = job.get("url") if not url: continue @@ -351,21 +348,32 @@ def store_jobs(conn: sqlite3.Connection, jobs: list[dict], conn.execute( "INSERT INTO jobs (url, title, salary, description, location, site, strategy, discovered_at) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - (url, job.get("title"), job.get("salary"), job.get("description"), - job.get("location"), site, strategy, now), + ( + url, + job.get("title"), + job.get("salary"), + job.get("description"), + job.get("location"), + site, + strategy, + now, + ), ) new += 1 except sqlite3.IntegrityError: existing += 1 + # Periodic commit for crash safety + if (i + 1) % 50 == 0: + conn.commit() + conn.commit() return new, existing -def get_jobs_by_stage(conn: sqlite3.Connection | None = None, - stage: str = "discovered", - min_score: int | None = None, - limit: int = 100) -> list[dict]: +def get_jobs_by_stage( + conn: sqlite3.Connection | None = None, stage: str = "discovered", min_score: int | None = None, limit: int = 100 +) -> list[dict]: """Fetch jobs filtered by pipeline stage. Args: @@ -391,10 +399,7 @@ def get_jobs_by_stage(conn: sqlite3.Connection | None = None, "AND tailored_resume_path IS NULL AND COALESCE(tailor_attempts, 0) < 5" ), "tailored": "tailored_resume_path IS NOT NULL", - "pending_apply": ( - "tailored_resume_path IS NOT NULL AND applied_at IS NULL " - "AND application_url IS NOT NULL" - ), + "pending_apply": ("tailored_resume_path IS NOT NULL AND applied_at IS NULL AND application_url IS NOT NULL"), "applied": "applied_at IS NOT NULL", } diff --git a/src/applypilot/discovery/jobspy.py b/src/applypilot/discovery/jobspy.py index b5e54ff..60c25f4 100644 --- a/src/applypilot/discovery/jobspy.py +++ b/src/applypilot/discovery/jobspy.py @@ -20,8 +20,16 @@ log = logging.getLogger(__name__) +def _clean(val) -> str | None: + if val is None: + return None + s = str(val) + return s if s and s != "nan" else None + + # -- Proxy parsing ----------------------------------------------------------- + def parse_proxy(proxy_str: str) -> dict: """Parse host:port:user:pass into components.""" parts = proxy_str.split(":") @@ -50,14 +58,12 @@ def parse_proxy(proxy_str: str) -> dict: "playwright": {"server": f"http://{host}:{port}"}, } else: - raise ValueError( - f"Proxy format not recognized: {proxy_str}. " - f"Expected: host:port:user:pass or host:port" - ) + raise ValueError(f"Proxy format not recognized: {proxy_str}. Expected: host:port:user:pass or host:port") # -- Retry wrapper ----------------------------------------------------------- + def _scrape_with_retry(kwargs: dict, max_retries: int = 2, backoff: float = 5.0): """Call scrape_jobs with retry on transient failures.""" for attempt in range(max_retries + 1): @@ -76,6 +82,7 @@ def _scrape_with_retry(kwargs: dict, max_retries: int = 2, backoff: float = 5.0) # -- Location filtering ------------------------------------------------------ + def _load_location_config(search_cfg: dict) -> tuple[list[str], list[str]]: """Extract accept/reject location lists from search config. @@ -117,6 +124,7 @@ def _location_ok(location: str | None, accept: list[str], reject: list[str]) -> # -- DB storage (JobSpy DataFrame -> SQLite) --------------------------------- + def store_jobspy_results(conn: sqlite3.Connection, df, source_label: str) -> tuple[int, int]: """Store JobSpy DataFrame results into the DB. Returns (new, existing).""" now = datetime.now(timezone.utc).isoformat() @@ -128,25 +136,24 @@ def store_jobspy_results(conn: sqlite3.Connection, df, source_label: str) -> tup if not url or url == "nan": continue - title = str(row.get("title", "")) if str(row.get("title", "")) != "nan" else None - company = str(row.get("company", "")) if str(row.get("company", "")) != "nan" else None - location_str = str(row.get("location", "")) if str(row.get("location", "")) != "nan" else None + title = _clean(row.get("title")) + company = _clean(row.get("company")) + location_str = _clean(row.get("location")) - # Build salary string from min/max salary = None - min_amt = row.get("min_amount") - max_amt = row.get("max_amount") - interval = str(row.get("interval", "")) if str(row.get("interval", "")) != "nan" else "" - currency = str(row.get("currency", "")) if str(row.get("currency", "")) != "nan" else "" - if min_amt and str(min_amt) != "nan": - if max_amt and str(max_amt) != "nan": + min_amt = _clean(row.get("min_amount")) + max_amt = _clean(row.get("max_amount")) + interval = _clean(row.get("interval")) or "" + currency = _clean(row.get("currency")) or "" + if min_amt: + if max_amt: salary = f"{currency}{int(float(min_amt)):,}-{currency}{int(float(max_amt)):,}" else: salary = f"{currency}{int(float(min_amt)):,}" if interval: salary += f"/{interval}" - description = str(row.get("description", "")) if str(row.get("description", "")) != "nan" else None + description = _clean(row.get("description")) site_name = str(row.get("site", source_label)) is_remote = row.get("is_remote", False) @@ -164,15 +171,26 @@ def store_jobspy_results(conn: sqlite3.Connection, df, source_label: str) -> tup detail_scraped_at = now # Extract apply URL if JobSpy provided it - apply_url = str(row.get("job_url_direct", "")) if str(row.get("job_url_direct", "")) != "nan" else None + apply_url = _clean(row.get("job_url_direct")) try: conn.execute( "INSERT INTO jobs (url, title, salary, description, location, site, strategy, discovered_at, " "full_description, application_url, detail_scraped_at) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - (url, title, salary, description, location_str, site_label, strategy, now, - full_description, apply_url, detail_scraped_at), + ( + url, + title, + salary, + description, + location_str, + site_label, + strategy, + now, + full_description, + apply_url, + detail_scraped_at, + ), ) new += 1 except sqlite3.IntegrityError: @@ -184,6 +202,7 @@ def store_jobspy_results(conn: sqlite3.Connection, df, source_label: str) -> tup # -- Single search execution ------------------------------------------------- + def _run_one_search( search: dict, sites: list[str], @@ -198,7 +217,7 @@ def _run_one_search( ) -> dict: """Run a single search query and store results in DB.""" s = search - label = f"\"{s['query']}\" in {s['location']} {'(remote)' if s.get('remote') else ''}" + label = f'"{s["query"]}" in {s["location"]} {"(remote)" if s.get("remote") else ""}' if "tier" in s: label += f" [tier {s['tier']}]" @@ -260,6 +279,7 @@ def _run_one_search( import pandas as pd import warnings + with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) df = pd.concat(all_dfs, ignore_index=True) if len(all_dfs) > 1 else all_dfs[0] @@ -270,10 +290,16 @@ def _run_one_search( # Filter by location before storing before = len(df) - df = df[df.apply(lambda row: _location_ok( - str(row.get("location", "")) if str(row.get("location", "")) != "nan" else None, - accept_locs, reject_locs, - ), axis=1)] + df = df[ + df.apply( + lambda row: _location_ok( + str(row.get("location", "")) if str(row.get("location", "")) != "nan" else None, + accept_locs, + reject_locs, + ), + axis=1, + ) + ] filtered = before - len(df) conn = get_connection() @@ -289,6 +315,7 @@ def _run_one_search( # -- Single query search ----------------------------------------------------- + def search_jobs( query: str, location: str, @@ -305,7 +332,7 @@ def search_jobs( proxy_config = parse_proxy(proxy) if proxy else None - log.info("Search: \"%s\" in %s | sites=%s | remote=%s", query, location, sites, remote_only) + log.info('Search: "%s" in %s | sites=%s | remote=%s', query, location, sites, remote_only) kwargs = { "site_name": sites, @@ -357,6 +384,7 @@ def search_jobs( # -- Full crawl (all queries x all locations) -------------------------------- + def _full_crawl( search_cfg: dict, tiers: list[int] | None = None, @@ -386,18 +414,19 @@ def _full_crawl( searches = [] for q in queries: for loc in locs: - searches.append({ - "query": q["query"], - "location": loc["location"], - "remote": loc.get("remote", False), - "tier": q.get("tier", 0), - }) + searches.append( + { + "query": q["query"], + "location": loc["location"], + "remote": loc.get("remote", False), + "tier": q.get("tier", 0), + } + ) proxy_config = parse_proxy(proxy) if proxy else None log.info("Full crawl: %d search combinations", len(searches)) - log.info("Sites: %s | Results/site: %d | Hours old: %d", - ", ".join(sites), results_per_site, hours_old) + log.info("Sites: %s | Results/site: %d | Hours old: %d", ", ".join(sites), results_per_site, hours_old) # Ensure DB schema is ready init_db() @@ -409,9 +438,16 @@ def _full_crawl( for s in searches: result = _run_one_search( - s, sites, results_per_site, hours_old, - proxy_config, defaults, max_retries, - accept_locs, reject_locs, glassdoor_map, + s, + sites, + results_per_site, + hours_old, + proxy_config, + defaults, + max_retries, + accept_locs, + reject_locs, + glassdoor_map, ) completed += 1 total_new += result["new"] @@ -419,15 +455,26 @@ def _full_crawl( total_errors += result["errors"] if completed % 5 == 0 or completed == len(searches): - log.info("Progress: %d/%d queries done (%d new, %d dupes, %d errors)", - completed, len(searches), total_new, total_existing, total_errors) + log.info( + "Progress: %d/%d queries done (%d new, %d dupes, %d errors)", + completed, + len(searches), + total_new, + total_existing, + total_errors, + ) # Final stats conn = get_connection() db_total = conn.execute("SELECT COUNT(*) FROM jobs").fetchone()[0] - log.info("Full crawl complete: %d new | %d dupes | %d errors | %d total in DB", - total_new, total_existing, total_errors, db_total) + log.info( + "Full crawl complete: %d new | %d dupes | %d errors | %d total in DB", + total_new, + total_existing, + total_errors, + db_total, + ) return { "new": total_new, @@ -440,6 +487,7 @@ def _full_crawl( # -- Public entry point ------------------------------------------------------ + def run_discovery(cfg: dict | None = None) -> dict: """Main entry point for JobSpy-based job discovery. diff --git a/src/applypilot/discovery/smartextract.py b/src/applypilot/discovery/smartextract.py index cf49a9a..5483fd9 100644 --- a/src/applypilot/discovery/smartextract.py +++ b/src/applypilot/discovery/smartextract.py @@ -48,6 +48,7 @@ # -- Location filtering ------------------------------------------------------- + def _load_location_filter(search_cfg: dict | None = None): """Load location accept/reject lists from search config.""" if search_cfg is None: @@ -75,6 +76,7 @@ def _location_ok(location: str | None, accept: list[str], reject: list[str]) -> # -- Site configuration from YAML -------------------------------------------- + def load_sites() -> list[dict]: """Load scraping target sites from config/sites.yaml.""" path = CONFIG_DIR / "sites.yaml" @@ -110,8 +112,16 @@ def _store_jobs_filtered( conn.execute( "INSERT INTO jobs (url, title, salary, description, location, site, strategy, discovered_at) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - (url, job.get("title"), job.get("salary"), job.get("description"), - job.get("location"), site, strategy, now), + ( + url, + job.get("title"), + job.get("salary"), + job.get("description"), + job.get("location"), + site, + strategy, + now, + ), ) new += 1 except sqlite3.IntegrityError: @@ -125,6 +135,7 @@ def _store_jobs_filtered( # -- Page intelligence collector --------------------------------------------- + def collect_page_intelligence(url: str, headless: bool = True) -> dict: """Load a page with Playwright and collect every signal a scraping engineer would look at in DevTools. Returns a structured intelligence report.""" @@ -152,12 +163,14 @@ def on_response(response): data = json.loads(body) except Exception: data = None - captured_responses.append({ - "url": rurl, - "status": response.status, - "size": len(body), - "data": data, - }) + captured_responses.append( + { + "url": rurl, + "status": response.status, + "size": len(body), + "data": data, + } + ) except Exception: pass @@ -166,8 +179,12 @@ def on_response(response): page = browser.new_page(user_agent=UA) page.on("response", on_response) - page.goto(url, timeout=60000) - page.wait_for_load_state("networkidle") + try: + page.goto(url, timeout=60000) + page.wait_for_load_state("networkidle") + except Exception: + browser.close() + raise intel["page_title"] = page.title() @@ -332,6 +349,7 @@ def _explore_nested(obj, path_prefix, depth=0): summary[f"nested_{path}"] = info elif isinstance(val, dict) and depth < 3: _explore_nested(val, path, depth + 1) + _explore_nested(data, "") intel["api_responses"].append(summary) @@ -397,8 +415,7 @@ def judge_api_responses(api_responses: list[dict]) -> list[dict]: verdict = extract_json(raw) is_relevant = verdict.get("relevant", False) reason = verdict.get("reason", "?") - log.info("Judge: %s -> %s (%s)", resp.get("url", "?")[:80], - "KEEP" if is_relevant else "DROP", reason) + log.info("Judge: %s -> %s (%s)", resp.get("url", "?")[:80], "KEEP" if is_relevant else "DROP", reason) if is_relevant: relevant.append(resp) except Exception as e: @@ -410,6 +427,7 @@ def judge_api_responses(api_responses: list[dict]) -> list[dict]: # -- Phase 1: strategy selection --------------------------------------------- + def format_strategy_briefing(intel: dict) -> str: """Lightweight briefing for strategy selection. No raw DOM.""" sections: list[str] = [] @@ -436,7 +454,9 @@ def format_strategy_briefing(intel: dict) -> str: sections.append(f"\nAPI RESPONSES INTERCEPTED: {len(intel['api_responses'])} calls") for resp in intel["api_responses"]: sections.append(f"\n URL: {resp['url']}") - sections.append(f" Status: {resp['status']} | Size: {resp['size']:,} chars | Type: {resp.get('type', '?')}") + sections.append( + f" Status: {resp['status']} | Size: {resp['size']:,} chars | Type: {resp.get('type', '?')}" + ) if "first_item_keys" in resp: sections.append(f" Item keys: {resp['first_item_keys']}") sections.append(f" Sample: {json.dumps(resp.get('first_item_sample', {}), indent=2)[:1000]}") @@ -454,7 +474,9 @@ def format_strategy_briefing(intel: dict) -> str: if "count" in sv: sections.append(f" .{arr_name}[0].{sub_name}: array of {sv['count']} items") sections.append(f" Item keys: {sv['first_item_keys']}") - sections.append(f" Sample: {json.dumps(sv.get('first_item_sample', {}), indent=2)[:1500]}") + sections.append( + f" Sample: {json.dumps(sv.get('first_item_sample', {}), indent=2)[:1500]}" + ) elif "keys" in sv: sections.append(f" .{arr_name}[0].{sub_name}: object with keys {sv['keys']}") sections.append(f" Sample: {json.dumps(sv.get('sample', {}), indent=2)[:1500]}") @@ -465,24 +487,28 @@ def format_strategy_briefing(intel: dict) -> str: if intel["data_testids"]: sections.append(f"\nDATA-TESTID ATTRIBUTES: {len(intel['data_testids'])} elements") for dt in intel["data_testids"][:15]: - text_preview = dt['text'].replace('\n', ' ')[:60] - sections.append(f" <{dt['tag']} data-testid=\"{dt['testid']}\"> {text_preview}") + text_preview = dt["text"].replace("\n", " ")[:60] + sections.append(f' <{dt["tag"]} data-testid="{dt["testid"]}"> {text_preview}') else: sections.append("\nDATA-TESTID: none found") # DOM stats stats = intel.get("dom_stats", {}) - sections.append(f"\nDOM STATS: {stats.get('total_elements', '?')} elements, " - f"{stats.get('links', '?')} links, {stats.get('headings', '?')} headings, " - f"{stats.get('tables', '?')} tables, {stats.get('articles', '?')} articles, " - f"{stats.get('has_data_ids', '?')} data-id elements") + sections.append( + f"\nDOM STATS: {stats.get('total_elements', '?')} elements, " + f"{stats.get('links', '?')} links, {stats.get('headings', '?')} headings, " + f"{stats.get('tables', '?')} tables, {stats.get('articles', '?')} articles, " + f"{stats.get('has_data_ids', '?')} data-id elements" + ) # Card candidates if intel["card_candidates"]: sections.append(f"\nREPEATING ELEMENTS DETECTED: {len(intel['card_candidates'])} candidate groups") for i, cand in enumerate(intel["card_candidates"]): - sections.append(f" [{i}] parent={cand['parent_selector']} child={cand['child_selector']} " - f"count={cand['total_children']} with_text={cand['with_text']} with_links={cand['with_links']}") + sections.append( + f" [{i}] parent={cand['parent_selector']} child={cand['child_selector']} " + f"count={cand['total_children']} with_text={cand['with_text']} with_links={cand['with_links']}" + ) else: sections.append("\nREPEATING ELEMENTS: none detected") @@ -525,8 +551,20 @@ def format_strategy_briefing(intel: dict) -> str: # -- Card HTML cleaning (allowlist approach) ---------------------------------- -_ALLOWED_ATTRS = {"id", "href", "data-testid", "data-id", "data-type", "data-slug", - "role", "aria-label", "aria-labelledby", "type", "name", "for"} +_ALLOWED_ATTRS = { + "id", + "href", + "data-testid", + "data-id", + "data-type", + "data-slug", + "role", + "aria-label", + "aria-labelledby", + "type", + "name", + "for", +} _ALLOWED_PREFIXES = ("data-", "aria-") _UTILITY_CLASS_RE = re.compile( r"^(" @@ -578,8 +616,7 @@ def clean_page_html(html: str, max_chars: int = 150_000) -> str: if main and len(str(main)) > 1000: soup = BeautifulSoup(str(main), "html.parser") - for tag in soup.find_all(["script", "style", "svg", "noscript", "iframe", - "link", "meta", "head", "footer", "nav"]): + for tag in soup.find_all(["script", "style", "svg", "noscript", "iframe", "link", "meta", "head", "footer", "nav"]): tag.decompose() for tag in soup.find_all(True): @@ -638,6 +675,7 @@ def clean_page_html(html: str, max_chars: int = 150_000) -> str: # -- LLM helpers ------------------------------------------------------------- + def ask_llm(prompt: str) -> tuple[str, float, dict]: """Send prompt to LLM. Returns (response_text, seconds_taken, metadata).""" client = get_client() @@ -663,7 +701,7 @@ def extract_json(text: str) -> dict: elif "```" in text: text = text.split("```")[1].split("```")[0] text = text.strip() - text = re.sub(r'\\([^"\\\/bfnrtu])', r'\1', text) + text = re.sub(r'\\([^"\\\/bfnrtu])', r"\1", text) try: return json.loads(text) except json.JSONDecodeError: @@ -678,6 +716,7 @@ def extract_json(text: str) -> dict: # -- JSON path resolution --------------------------------------------------- + def resolve_json_path_raw(data, path: str): """Navigate a JSON path and return whatever is there (including lists/dicts).""" if not path or not data: @@ -726,6 +765,7 @@ def resolve_json_path(data, path: str): # -- Extraction executors ---------------------------------------------------- + def execute_json_ld(intel: dict, plan: dict) -> list[dict]: """Extract jobs from JSON-LD JobPosting entries.""" ext = plan["extraction"] @@ -799,7 +839,7 @@ def execute_css_selectors(intel: dict) -> tuple[dict, list[dict]]: log.error("LLM_ERROR in Phase 2: %s", e) return {}, [] - log.info("Phase 2 LLM: %d chars, %.1fs", meta['response_chars'], elapsed) + log.info("Phase 2 LLM: %d chars, %.1fs", meta["response_chars"], elapsed) try: selectors = extract_json(raw) @@ -847,6 +887,7 @@ def execute_css_selectors(intel: dict) -> tuple[dict, list[dict]]: # -- Main per-site extraction ------------------------------------------------ + def _run_one_site(name: str, url: str) -> dict: """Run full smart extraction pipeline on one site URL.""" log.info("=" * 60) @@ -857,22 +898,38 @@ def _run_one_site(name: str, url: str) -> dict: t0 = time.time() intel = collect_page_intelligence(url) collect_time = time.time() - t0 - log.info("Done in %.1fs | JSON-LD: %d | API: %d | testids: %d | cards: %d", - collect_time, len(intel["json_ld"]), len(intel["api_responses"]), - len(intel["data_testids"]), len(intel["card_candidates"])) + log.info( + "Done in %.1fs | JSON-LD: %d | API: %d | testids: %d | cards: %d", + collect_time, + len(intel["json_ld"]), + len(intel["api_responses"]), + len(intel["data_testids"]), + len(intel["card_candidates"]), + ) # Headful retry if page content is tiny full_html = intel.get("full_html", "") cleaned_check = clean_page_html(full_html) if full_html else "" - _captcha_signals = ["captcha", "are you a human", "verify you", "unusual requests", - "access denied", "please verify", "bot detection"] + _captcha_signals = [ + "captcha", + "are you a human", + "verify you", + "unusual requests", + "access denied", + "please verify", + "bot detection", + ] _is_captcha = any(s in full_html.lower() for s in _captcha_signals) if full_html else False if len(cleaned_check) < 5000 and full_html and not _is_captcha: log.info("Cleaned HTML only %s chars -- retrying headful...", f"{len(cleaned_check):,}") intel = collect_page_intelligence(url, headless=False) collect_time = time.time() - t0 - log.info("Headful done in %.1fs | JSON-LD: %d | API: %d", - collect_time, len(intel["json_ld"]), len(intel["api_responses"])) + log.info( + "Headful done in %.1fs | JSON-LD: %d | API: %d", + collect_time, + len(intel["json_ld"]), + len(intel["api_responses"]), + ) elif _is_captcha: log.warning("CAPTCHA/rate-limit detected -- skipping headful retry") @@ -933,14 +990,23 @@ def _run_one_site(name: str, url: str) -> dict: urls = sum(1 for j in jobs if j.get("url")) salaries = sum(1 for j in jobs if j.get("salary")) descs = sum(1 for j in jobs if j.get("description")) - log.info("RESULT: %s -- %d jobs, %d titles, %d urls, %d salaries, %d descriptions", - status, total, titles, urls, salaries, descs) + log.info( + "RESULT: %s -- %d jobs, %d titles, %d urls, %d salaries, %d descriptions", + status, + total, + titles, + urls, + salaries, + descs, + ) for j in jobs[:3]: - log.info(" - %s | loc: %s | salary: %s", - str(j.get("title") or "?")[:55], - str(j.get("location") or "?")[:25], - str(j.get("salary") or "-")[:20]) + log.info( + " - %s | loc: %s | salary: %s", + str(j.get("title") or "?")[:55], + str(j.get("location") or "?")[:25], + str(j.get("salary") or "-")[:20], + ) return { "name": name, @@ -956,6 +1022,7 @@ def _run_one_site(name: str, url: str) -> dict: # -- Target building -------------------------------------------------------- + def build_scrape_targets( sites: list[dict] | None = None, search_cfg: dict | None = None, @@ -993,25 +1060,30 @@ def build_scrape_targets( expanded_url = expanded_url.replace("{query_encoded}", quote_plus(query)) expanded_url = expanded_url.replace("{query}", quote_plus(query)) expanded_url = expanded_url.replace("{location_encoded}", quote_plus(default_location)) - targets.append({ - "name": site_name, - "url": expanded_url, - "query": query, - }) + targets.append( + { + "name": site_name, + "url": expanded_url, + "query": query, + } + ) else: expanded_url = site_url expanded_url = expanded_url.replace("{location_encoded}", quote_plus(default_location)) - targets.append({ - "name": site_name, - "url": expanded_url, - "query": None, - }) + targets.append( + { + "name": site_name, + "url": expanded_url, + "query": None, + } + ) return targets # -- Run all sites ----------------------------------------------------------- + def _run_all( targets: list[dict], accept_locs: list[str], @@ -1025,8 +1097,9 @@ def _run_all( """ conn = init_db() pre_stats = get_stats(conn) - log.info("Database: %d jobs already stored, %d pending detail scrape", - pre_stats["total"], pre_stats["pending_detail"]) + log.info( + "Database: %d jobs already stored, %d pending detail scrape", pre_stats["total"], pre_stats["pending_detail"] + ) results: list[dict] = [] total_new = 0 @@ -1036,9 +1109,9 @@ def _process_result(r: dict, target: dict) -> None: nonlocal total_new, total_existing jobs = r.get("jobs", []) if jobs: - new, existing = _store_jobs_filtered(conn, jobs, target["name"], - r.get("strategy", "?"), - accept_locs, reject_locs) + new, existing = _store_jobs_filtered( + conn, jobs, target["name"], r.get("strategy", "?"), accept_locs, reject_locs + ) total_new += new total_existing += existing log.info("DB: +%d new, %d already existed", new, existing) @@ -1046,10 +1119,7 @@ def _process_result(r: dict, target: dict) -> None: if workers > 1 and len(targets) > 1: # Parallel mode with ThreadPoolExecutor(max_workers=min(workers, len(targets))) as pool: - future_to_target = { - pool.submit(_run_one_site, target["name"], target["url"]): target - for target in targets - } + future_to_target = {pool.submit(_run_one_site, target["name"], target["url"]): target for target in targets} for future in as_completed(future_to_target): target = future_to_target[future] r = future.result() @@ -1079,12 +1149,12 @@ def _process_result(r: dict, target: dict) -> None: passed = sum(1 for r in results if r["status"] == "PASS") log.info("%d/%d PASS", passed, len(results)) - return {"total_new": total_new, "total_existing": total_existing, - "passed": passed, "total": len(results)} + return {"total_new": total_new, "total_existing": total_existing, "passed": passed, "total": len(results)} # -- Public entry point ------------------------------------------------------ + def run_smart_extract( sites: list[dict] | None = None, workers: int = 1, @@ -1112,7 +1182,12 @@ def run_smart_extract( search_sites = sum(1 for s in (sites or load_sites()) if s.get("type") == "search") static_sites = sum(1 for s in (sites or load_sites()) if s.get("type") != "search") - log.info("Sites: %d searchable, %d static | Total targets: %d (workers=%d)", - search_sites, static_sites, len(targets), workers) + log.info( + "Sites: %d searchable, %d static | Total targets: %d (workers=%d)", + search_sites, + static_sites, + len(targets), + workers, + ) return _run_all(targets, accept_locs, reject_locs, workers=workers) diff --git a/src/applypilot/enrichment/detail.py b/src/applypilot/enrichment/detail.py index 11b7926..551eeed 100644 --- a/src/applypilot/enrichment/detail.py +++ b/src/applypilot/enrichment/detail.py @@ -43,14 +43,17 @@ def set_proxy(proxy_str: str | None): global _PROXY_CONFIG if proxy_str: from applypilot.discovery.jobspy import parse_proxy + _PROXY_CONFIG = parse_proxy(proxy_str) # -- URL resolution ---------------------------------------------------------- + def _load_base_urls() -> dict[str, str | None]: """Load site base URLs from config/sites.yaml.""" from applypilot.config import load_base_urls + return load_base_urls() @@ -120,16 +123,13 @@ def resolve_all_urls(conn: sqlite3.Connection) -> dict: app_resolved += 1 conn.commit() - return {"resolved": resolved, "failed": failed, "already_absolute": already_absolute, - "app_resolved": app_resolved} + return {"resolved": resolved, "failed": failed, "already_absolute": already_absolute, "app_resolved": app_resolved} def resolve_wttj_urls(conn: sqlite3.Connection) -> int: """Re-fetch WTTJ Algolia API to get proper detail URLs and fix slug-as-title. Returns count of URLs updated.""" - wttj_jobs = conn.execute( - "SELECT url, title FROM jobs WHERE site = 'WelcomeToTheJungle'" - ).fetchall() + wttj_jobs = conn.execute("SELECT url, title FROM jobs WHERE site = 'WelcomeToTheJungle'").fetchall() if not wttj_jobs: return 0 @@ -205,6 +205,7 @@ def capture_algolia(response): # -- Detail page intelligence ------------------------------------------------ + def collect_detail_intelligence(page) -> dict: """Collect signals from a detail page. Lighter than discovery -- no API interception.""" intel: dict = {"json_ld": [], "page_title": "", "final_url": ""} @@ -224,6 +225,7 @@ def collect_detail_intelligence(page) -> dict: # -- Tier 1: JSON-LD extraction ----------------------------------------------- + def extract_from_json_ld(intel: dict) -> dict | None: """Extract description and apply URL from JSON-LD JobPosting. Returns {"full_description": str, "application_url": str|None} or None.""" @@ -283,9 +285,9 @@ def find_job_posting(data): 'a[class*="apply"]', 'a[aria-label*="pply"]', 'button[data-testid*="apply"]', - 'a#apply_button', - '.postings-btn-wrapper a', - 'a.ashby-job-posting-apply-button', + "a#apply_button", + ".postings-btn-wrapper a", + "a.ashby-job-posting-apply-button", '#grnhse_app a[href*="apply"]', 'a[data-qa="btn-apply"]', 'a[class*="btn-apply"]', @@ -294,29 +296,29 @@ def find_job_posting(data): ] DESCRIPTION_SELECTORS = [ - '#job-description', - '#job_description', - '#jobDescriptionText', - '.job-description', - '.job_description', + "#job-description", + "#job_description", + "#jobDescriptionText", + ".job-description", + ".job_description", '[class*="job-description"]', '[class*="jobDescription"]', '[data-testid*="description"]', '[data-testid="job-description"]', - '.posting-page .posting-categories + div', - '#content .posting-page', - '#app_body .content', - '#grnhse_app .content', - '.ashby-job-posting-description', + ".posting-page .posting-categories + div", + "#content .posting-page", + "#app_body .content", + "#grnhse_app .content", + ".ashby-job-posting-description", '[class*="posting-description"]', '[class*="job-detail"]', '[class*="jobDetail"]', '[class*="job-content"]', '[class*="job-body"]', '[role="main"] article', - 'main article', + "main article", 'article[class*="job"]', - '.job-posting-content', + ".job-posting-content", ] @@ -470,6 +472,7 @@ def extract_with_llm(page, url: str) -> dict: log.info("LLM: %d chars in, %.1fs", len(prompt), elapsed) from applypilot.discovery.smartextract import extract_json + result = extract_json(raw) desc = result.get("full_description") apply_url = result.get("application_url") @@ -485,6 +488,7 @@ def extract_with_llm(page, url: str) -> dict: # -- Description cleaning --------------------------------------------------- + def clean_description(text: str) -> str: """Convert HTML description to clean readable text.""" if not text: @@ -658,8 +662,15 @@ def scrape_site_batch( apply_str = "yes" if result.get("application_url") else "no" err_str = f" | err={result.get('error')}" if result.get("error") else "" - log.info(" %s | %s | desc=%s chars | apply=%s | %.1fs%s", - status, tier_str, f"{desc_len:,}", apply_str, elapsed, err_str) + log.info( + " %s | %s | desc=%s chars | apply=%s | %.1fs%s", + status, + tier_str, + f"{desc_len:,}", + apply_str, + elapsed, + err_str, + ) if status in ("ok", "partial"): stats[status] += 1 @@ -702,10 +713,10 @@ def _run_detail_scraper( Returns aggregate stats dict. """ - skip_filter = " AND ".join(f"site != '{s}'" for s in SKIP_DETAIL_SITES) - where = f"WHERE detail_scraped_at IS NULL AND {skip_filter}" + placeholders = ",".join("?" * len(SKIP_DETAIL_SITES)) rows = conn.execute( - f"SELECT url, title, site FROM jobs {where} ORDER BY site" + f"SELECT url, title, site FROM jobs WHERE detail_scraped_at IS NULL AND site NOT IN ({placeholders}) ORDER BY site", + list(SKIP_DETAIL_SITES), ).fetchall() if not rows: @@ -724,8 +735,12 @@ def _run_detail_scraper( log.info(" %s: %d jobs", site, len(jobs)) known_order = [ - "RemoteOK", "Job Bank Canada", "BuiltIn Remote", - "WelcomeToTheJungle", "CareerJet Canada", "Hacker News Jobs", + "RemoteOK", + "Job Bank Canada", + "BuiltIn Remote", + "WelcomeToTheJungle", + "CareerJet Canada", + "Hacker News Jobs", ] order = [s for s in known_order if s in site_jobs] order += [s for s in sorted(site_jobs.keys()) if s not in order] @@ -746,9 +761,16 @@ def _scrape_site(site: str) -> dict: delay = SITE_DELAYS.get(site, 2.0) log.info("%s -- %d jobs (delay=%.1fs)", site, len(jobs), delay) stats = scrape_site_batch(None, site, jobs, delay=delay, max_jobs=max_per_site) - log.info("%s summary: %d ok, %d partial, %d error | T1=%d T2=%d T3=%d", - site, stats["ok"], stats["partial"], stats["error"], - stats["tiers"].get(1, 0), stats["tiers"].get(2, 0), stats["tiers"].get(3, 0)) + log.info( + "%s summary: %d ok, %d partial, %d error | T1=%d T2=%d T3=%d", + site, + stats["ok"], + stats["partial"], + stats["error"], + stats["tiers"].get(1, 0), + stats["tiers"].get(2, 0), + stats["tiers"].get(3, 0), + ) return stats with ThreadPoolExecutor(max_workers=min(workers, len(order))) as pool: @@ -765,14 +787,29 @@ def _scrape_site(site: str) -> dict: stats = scrape_site_batch(conn, site, jobs, delay=delay, max_jobs=max_per_site) _merge_stats(stats) - log.info("Site summary: %d ok, %d partial, %d error | T1=%d T2=%d T3=%d", - stats["ok"], stats["partial"], stats["error"], - stats["tiers"].get(1, 0), stats["tiers"].get(2, 0), stats["tiers"].get(3, 0)) - - log.info("TOTAL: %d processed | %d ok | %d partial | %d error", - total_stats["processed"], total_stats["ok"], total_stats["partial"], total_stats["error"]) - log.info("Tier distribution: T1=%d T2=%d T3=%d", - total_stats["tiers"].get(1, 0), total_stats["tiers"].get(2, 0), total_stats["tiers"].get(3, 0)) + log.info( + "Site summary: %d ok, %d partial, %d error | T1=%d T2=%d T3=%d", + stats["ok"], + stats["partial"], + stats["error"], + stats["tiers"].get(1, 0), + stats["tiers"].get(2, 0), + stats["tiers"].get(3, 0), + ) + + log.info( + "TOTAL: %d processed | %d ok | %d partial | %d error", + total_stats["processed"], + total_stats["ok"], + total_stats["partial"], + total_stats["error"], + ) + log.info( + "Tier distribution: T1=%d T2=%d T3=%d", + total_stats["tiers"].get(1, 0), + total_stats["tiers"].get(2, 0), + total_stats["tiers"].get(3, 0), + ) llm_calls = total_stats["tiers"].get(3, 0) total = total_stats["processed"] @@ -785,6 +822,7 @@ def _scrape_site(site: str) -> dict: # -- Streaming detail scraper (for sequential pipeline) ---------------------- + def stream_detail( upstream_done, my_done, @@ -805,8 +843,7 @@ def stream_detail( conn = init_db() url_stats = resolve_all_urls(conn) - log.info("URL resolution: %d resolved, %d absolute", - url_stats['resolved'], url_stats['already_absolute']) + log.info("URL resolution: %d resolved, %d absolute", url_stats["resolved"], url_stats["already_absolute"]) total_ok = 0 total_err = 0 @@ -814,11 +851,10 @@ def stream_detail( try: while True: - skip_filter = " AND ".join(f"site != '{s}'" for s in SKIP_DETAIL_SITES) + placeholders = ",".join("?" * len(SKIP_DETAIL_SITES)) rows = conn.execute( - "SELECT url, title, site FROM jobs " - f"WHERE detail_scraped_at IS NULL AND {skip_filter} " - "ORDER BY site LIMIT 200" + f"SELECT url, title, site FROM jobs WHERE detail_scraped_at IS NULL AND site NOT IN ({placeholders}) ORDER BY site LIMIT 200", + list(SKIP_DETAIL_SITES), ).fetchall() if rows: @@ -835,8 +871,7 @@ def stream_detail( stats = scrape_site_batch(conn, site, jobs, delay=delay) total_ok += stats["ok"] + stats["partial"] total_err += stats["error"] - log.info("%s: %d ok, %d partial, %d error", - site, stats['ok'], stats['partial'], stats['error']) + log.info("%s: %d ok, %d partial, %d error", site, stats["ok"], stats["partial"], stats["error"]) except Exception as e: log.error("%s: CRASHED: %s", site, e) @@ -855,6 +890,7 @@ def stream_detail( # -- Public entry point ------------------------------------------------------ + def run_enrichment(limit: int = 100, workers: int = 1) -> dict: """Main entry point for detail page enrichment. @@ -873,17 +909,17 @@ def run_enrichment(limit: int = 100, workers: int = 1) -> dict: # URL resolution first url_stats = resolve_all_urls(conn) - log.info("URL resolution: %d resolved, %d absolute, %d failed", - url_stats["resolved"], url_stats["already_absolute"], url_stats["failed"]) + log.info( + "URL resolution: %d resolved, %d absolute, %d failed", + url_stats["resolved"], + url_stats["already_absolute"], + url_stats["failed"], + ) # WTTJ special handling - wttj_count = conn.execute( - "SELECT COUNT(*) FROM jobs WHERE site = 'WelcomeToTheJungle'" - ).fetchone()[0] + wttj_count = conn.execute("SELECT COUNT(*) FROM jobs WHERE site = 'WelcomeToTheJungle'").fetchone()[0] if wttj_count > 0: - sample = conn.execute( - "SELECT url FROM jobs WHERE site = 'WelcomeToTheJungle' LIMIT 1" - ).fetchone() + sample = conn.execute("SELECT url FROM jobs WHERE site = 'WelcomeToTheJungle' LIMIT 1").fetchone() if sample and not sample[0].startswith("http"): updated = resolve_wttj_urls(conn) log.info("WTTJ: %d URLs updated", updated) diff --git a/src/applypilot/llm.py b/src/applypilot/llm.py index 1fb7be6..06b9b5b 100644 --- a/src/applypilot/llm.py +++ b/src/applypilot/llm.py @@ -9,8 +9,10 @@ LLM_MODEL env var overrides the model name for any provider. """ +import atexit import logging import os +import threading import time import httpx @@ -21,6 +23,7 @@ # Provider detection # --------------------------------------------------------------------------- + def _detect_provider() -> tuple[str, str, str]: """Return (base_url, model, api_key) based on environment variables. @@ -54,8 +57,7 @@ def _detect_provider() -> tuple[str, str, str]: ) raise RuntimeError( - "No LLM provider configured. " - "Set GEMINI_API_KEY, OPENAI_API_KEY, or LLM_URL in your environment." + "No LLM provider configured. Set GEMINI_API_KEY, OPENAI_API_KEY, or LLM_URL in your environment." ) @@ -230,23 +232,23 @@ def chat( resp = exc.response if resp.status_code in (429, 503) and attempt < _MAX_RETRIES - 1: # Respect Retry-After header if provided (Gemini sends this). - retry_after = ( - resp.headers.get("Retry-After") - or resp.headers.get("X-RateLimit-Reset-Requests") - ) + retry_after = resp.headers.get("Retry-After") or resp.headers.get("X-RateLimit-Reset-Requests") if retry_after: try: wait = float(retry_after) except (ValueError, TypeError): - wait = _RATE_LIMIT_BASE_WAIT * (2 ** attempt) + wait = _RATE_LIMIT_BASE_WAIT * (2**attempt) else: - wait = min(_RATE_LIMIT_BASE_WAIT * (2 ** attempt), 60) + wait = min(_RATE_LIMIT_BASE_WAIT * (2**attempt), 60) log.warning( "LLM rate limited (HTTP %s). Waiting %ds before retry %d/%d. " "Tip: Gemini free tier = 15 RPM. Consider a paid account " "or switching to a local model.", - resp.status_code, wait, attempt + 1, _MAX_RETRIES, + resp.status_code, + wait, + attempt + 1, + _MAX_RETRIES, ) time.sleep(wait) continue @@ -254,10 +256,12 @@ def chat( except httpx.TimeoutException: if attempt < _MAX_RETRIES - 1: - wait = min(_RATE_LIMIT_BASE_WAIT * (2 ** attempt), 60) + wait = min(_RATE_LIMIT_BASE_WAIT * (2**attempt), 60) log.warning( "LLM request timed out, retrying in %ds (attempt %d/%d)", - wait, attempt + 1, _MAX_RETRIES, + wait, + attempt + 1, + _MAX_RETRIES, ) time.sleep(wait) continue @@ -275,6 +279,7 @@ def close(self) -> None: class _GeminiCompatForbidden(Exception): """Sentinel: Gemini OpenAI-compat returned 403. Switch to native API.""" + def __init__(self, response: httpx.Response) -> None: self.response = response super().__init__(f"Gemini compat 403: {response.text[:200]}") @@ -285,13 +290,17 @@ def __init__(self, response: httpx.Response) -> None: # --------------------------------------------------------------------------- _instance: LLMClient | None = None +_lock = threading.Lock() def get_client() -> LLMClient: - """Return (or create) the module-level LLMClient singleton.""" + """Return (or create) the module-level LLMClient singleton (thread-safe).""" global _instance if _instance is None: - base_url, model, api_key = _detect_provider() - log.info("LLM provider: %s model: %s", base_url, model) - _instance = LLMClient(base_url, model, api_key) + with _lock: + if _instance is None: # double-check locking + base_url, model, api_key = _detect_provider() + log.info("LLM provider: %s model: %s", base_url, model) + _instance = LLMClient(base_url, model, api_key) + atexit.register(_instance.close) return _instance diff --git a/src/applypilot/pipeline.py b/src/applypilot/pipeline.py index 29881c5..abc9559 100644 --- a/src/applypilot/pipeline.py +++ b/src/applypilot/pipeline.py @@ -15,6 +15,7 @@ import logging import threading import time +from collections.abc import Callable from datetime import datetime from rich.console import Console @@ -36,22 +37,22 @@ STAGE_META: dict[str, dict] = { "discover": {"desc": "Job discovery (JobSpy + Workday + smart extract)"}, - "enrich": {"desc": "Detail enrichment (full descriptions + apply URLs)"}, - "score": {"desc": "LLM scoring (fit 1-10)"}, - "tailor": {"desc": "Resume tailoring (LLM + validation)"}, - "cover": {"desc": "Cover letter generation"}, - "pdf": {"desc": "PDF conversion (tailored resumes + cover letters)"}, + "enrich": {"desc": "Detail enrichment (full descriptions + apply URLs)"}, + "score": {"desc": "LLM scoring (fit 1-10)"}, + "tailor": {"desc": "Resume tailoring (LLM + validation)"}, + "cover": {"desc": "Cover letter generation"}, + "pdf": {"desc": "PDF conversion (tailored resumes + cover letters)"}, } # Upstream dependency: a stage only finishes when its upstream is done AND # it has no remaining pending work. _UPSTREAM: dict[str, str | None] = { "discover": None, - "enrich": "discover", - "score": "enrich", - "tailor": "score", - "cover": "tailor", - "pdf": "cover", + "enrich": "discover", + "score": "enrich", + "tailor": "score", + "cover": "tailor", + "pdf": "cover", } @@ -59,6 +60,7 @@ # Individual stage runners # --------------------------------------------------------------------------- + def _run_discover(workers: int = 1) -> dict: """Stage: Job discovery — JobSpy, Workday, and smart-extract scrapers.""" stats: dict = {"jobspy": None, "workday": None, "smartextract": None} @@ -67,6 +69,7 @@ def _run_discover(workers: int = 1) -> dict: console.print(" [cyan]JobSpy full crawl...[/cyan]") try: from applypilot.discovery.jobspy import run_discovery + run_discovery() stats["jobspy"] = "ok" except Exception as e: @@ -78,6 +81,7 @@ def _run_discover(workers: int = 1) -> dict: console.print(" [cyan]Workday corporate scraper...[/cyan]") try: from applypilot.discovery.workday import run_workday_discovery + run_workday_discovery(workers=workers) stats["workday"] = "ok" except Exception as e: @@ -89,6 +93,7 @@ def _run_discover(workers: int = 1) -> dict: console.print(" [cyan]Smart extract (AI-powered scraping)...[/cyan]") try: from applypilot.discovery.smartextract import run_smart_extract + run_smart_extract(workers=workers) stats["smartextract"] = "ok" except Exception as e: @@ -103,6 +108,7 @@ def _run_enrich(workers: int = 1) -> dict: """Stage: Detail enrichment — scrape full descriptions and apply URLs.""" try: from applypilot.enrichment.detail import run_enrichment + run_enrichment(workers=workers) return {"status": "ok"} except Exception as e: @@ -114,6 +120,7 @@ def _run_score() -> dict: """Stage: LLM scoring — assign fit scores 1-10.""" try: from applypilot.scoring.scorer import run_scoring + run_scoring() return {"status": "ok"} except Exception as e: @@ -125,6 +132,7 @@ def _run_tailor(min_score: int = 7, validation_mode: str = "normal") -> dict: """Stage: Resume tailoring — generate tailored resumes for high-fit jobs.""" try: from applypilot.scoring.tailor import run_tailoring + run_tailoring(min_score=min_score, validation_mode=validation_mode) return {"status": "ok"} except Exception as e: @@ -136,6 +144,7 @@ def _run_cover(min_score: int = 7, validation_mode: str = "normal") -> dict: """Stage: Cover letter generation.""" try: from applypilot.scoring.cover_letter import run_cover_letters + run_cover_letters(min_score=min_score, validation_mode=validation_mode) return {"status": "ok"} except Exception as e: @@ -147,6 +156,7 @@ def _run_pdf() -> dict: """Stage: PDF conversion — convert tailored resumes and cover letters to PDF.""" try: from applypilot.scoring.pdf import batch_convert + batch_convert() return {"status": "ok"} except Exception as e: @@ -155,13 +165,13 @@ def _run_pdf() -> dict: # Map stage names to their runner functions -_STAGE_RUNNERS: dict[str, callable] = { +_STAGE_RUNNERS: dict[str, Callable] = { "discover": _run_discover, - "enrich": _run_enrich, - "score": _run_score, - "tailor": _run_tailor, - "cover": _run_cover, - "pdf": _run_pdf, + "enrich": _run_enrich, + "score": _run_score, + "tailor": _run_tailor, + "cover": _run_cover, + "pdf": _run_pdf, } @@ -169,6 +179,7 @@ def _run_pdf() -> dict: # Stage resolution # --------------------------------------------------------------------------- + def _resolve_stages(stage_names: list[str]) -> list[str]: """Resolve 'all' and validate/order stage names.""" if "all" in stage_names: @@ -177,10 +188,7 @@ def _resolve_stages(stage_names: list[str]) -> list[str]: resolved = [] for name in stage_names: if name not in STAGE_META: - console.print( - f"[red]Unknown stage:[/red] '{name}'. " - f"Available: {', '.join(STAGE_ORDER)}, all" - ) + console.print(f"[red]Unknown stage:[/red] '{name}'. Available: {', '.join(STAGE_ORDER)}, all") raise SystemExit(1) if name not in resolved: resolved.append(name) @@ -193,13 +201,12 @@ def _resolve_stages(stage_names: list[str]) -> list[str]: # Streaming pipeline helpers # --------------------------------------------------------------------------- + class _StageTracker: """Thread-safe tracker for which stages have finished producing work.""" def __init__(self): - self._events: dict[str, threading.Event] = { - stage: threading.Event() for stage in STAGE_ORDER - } + self._events: dict[str, threading.Event] = {stage: threading.Event() for stage in STAGE_ORDER} self._results: dict[str, dict] = {} self._lock = threading.Lock() @@ -222,7 +229,7 @@ def get_results(self) -> dict[str, dict]: # SQL to count pending work for each stage _PENDING_SQL: dict[str, str] = { "enrich": "SELECT COUNT(*) FROM jobs WHERE detail_scraped_at IS NULL", - "score": "SELECT COUNT(*) FROM jobs WHERE full_description IS NOT NULL AND fit_score IS NULL", + "score": "SELECT COUNT(*) FROM jobs WHERE full_description IS NOT NULL AND fit_score IS NULL", "tailor": ( "SELECT COUNT(*) FROM jobs WHERE fit_score >= ? " "AND full_description IS NOT NULL " @@ -234,10 +241,7 @@ def get_results(self) -> dict[str, dict]: "AND (cover_letter_path IS NULL OR cover_letter_path = '') " "AND COALESCE(cover_attempts, 0) < 5" ), - "pdf": ( - "SELECT COUNT(*) FROM jobs WHERE tailored_resume_path IS NOT NULL " - "AND tailored_resume_path LIKE '%.txt'" - ), + "pdf": ("SELECT COUNT(*) FROM jobs WHERE tailored_resume_path IS NOT NULL AND tailored_resume_path LIKE '%.txt'"), } # How long to sleep between polling loops in streaming mode (seconds) @@ -289,12 +293,11 @@ def _run_stage_streaming( tracker.mark_done(stage, {"status": f"error: {e}"}) return - # For downstream stages: loop until upstream done + no pending work passes = 0 + successes = 0 + failures = 0 while not stop_event.is_set(): - # Wait for upstream to start producing work (first pass only) if passes == 0 and upstream and not tracker.is_done(upstream): - # Wait a bit for upstream to produce some work before first run tracker.wait(upstream, timeout=_STREAM_POLL_INTERVAL) pending = _count_pending(stage, min_score) @@ -303,28 +306,28 @@ def _run_stage_streaming( try: runner(**kwargs) passes += 1 + successes += 1 except Exception as e: log.error("Stage '%s' error (pass %d): %s", stage, passes, e) passes += 1 + failures += 1 else: - # No work right now upstream_done = upstream is None or tracker.is_done(upstream) if upstream_done: - # No work and upstream is done — this stage is finished break - # Upstream still running, wait and retry if stop_event.wait(timeout=_STREAM_POLL_INTERVAL): - break # Stop requested + break - tracker.mark_done(stage, {"status": "ok", "passes": passes}) + status = "ok" if successes > 0 else ("error" if failures > 0 else "ok") + tracker.mark_done(stage, {"status": status, "passes": passes, "successes": successes, "failures": failures}) # --------------------------------------------------------------------------- # Pipeline orchestrators # --------------------------------------------------------------------------- -def _run_sequential(ordered: list[str], min_score: int, workers: int = 1, - validation_mode: str = "normal") -> dict: + +def _run_sequential(ordered: list[str], min_score: int, workers: int = 1, validation_mode: str = "normal") -> dict: """Execute stages one at a time (original behavior).""" results: list[dict] = [] errors: dict[str, str] = {} @@ -355,8 +358,7 @@ def _run_sequential(ordered: list[str], min_score: int, workers: int = 1, status = result.get("status", "ok") if name == "discover": sub_errors = [ - f"{k}: {v}" for k, v in result.items() - if isinstance(v, str) and v.startswith("error") + f"{k}: {v}" for k, v in result.items() if isinstance(v, str) and v.startswith("error") ] if sub_errors: status = "partial" @@ -377,8 +379,7 @@ def _run_sequential(ordered: list[str], min_score: int, workers: int = 1, return {"stages": results, "errors": errors, "elapsed": total_elapsed} -def _run_streaming(ordered: list[str], min_score: int, workers: int = 1, - validation_mode: str = "normal") -> dict: +def _run_streaming(ordered: list[str], min_score: int, workers: int = 1, validation_mode: str = "normal") -> dict: """Execute stages concurrently with DB as conveyor belt.""" tracker = _StageTracker() stop_event = threading.Event() @@ -413,9 +414,7 @@ def _run_streaming(ordered: list[str], min_score: int, workers: int = 1, for name in ordered: threads[name].join() elapsed = time.time() - start_times[name] - console.print( - f" [green]Completed:[/green] {name} ({elapsed:.1f}s)" - ) + console.print(f" [green]Completed:[/green] {name} ({elapsed:.1f}s)") except KeyboardInterrupt: console.print("\n[yellow]Interrupted — stopping stages...[/yellow]") stop_event.set() @@ -474,10 +473,12 @@ def run_pipeline( # Banner mode = "streaming" if stream else "sequential" console.print() - console.print(Panel.fit( - f"[bold]ApplyPilot Pipeline[/bold] ({mode})", - border_style="blue", - )) + console.print( + Panel.fit( + f"[bold]ApplyPilot Pipeline[/bold] ({mode})", + border_style="blue", + ) + ) console.print(f" Min score: {min_score}") console.print(f" Workers: {workers}") console.print(f" Validation: {validation_mode}") @@ -497,11 +498,9 @@ def run_pipeline( # Execute if stream: - result = _run_streaming(ordered, min_score, workers=workers, - validation_mode=validation_mode) + result = _run_streaming(ordered, min_score, workers=workers, validation_mode=validation_mode) else: - result = _run_sequential(ordered, min_score, workers=workers, - validation_mode=validation_mode) + result = _run_sequential(ordered, min_score, workers=workers, validation_mode=validation_mode) # Summary table console.print(f"\n{'=' * 70}") diff --git a/src/applypilot/scoring/cover_letter.py b/src/applypilot/scoring/cover_letter.py index c16cdd5..88970dc 100644 --- a/src/applypilot/scoring/cover_letter.py +++ b/src/applypilot/scoring/cover_letter.py @@ -28,6 +28,7 @@ # ── Prompt Builder (profile-driven) ────────────────────────────────────── + def _build_cover_letter_prompt(profile: dict) -> str: """Build the cover letter system prompt from the user's profile. @@ -102,6 +103,7 @@ def _build_cover_letter_prompt(profile: dict) -> str: # ── Helpers ────────────────────────────────────────────────────────────── + def _strip_preamble(text: str) -> str: """Remove LLM preamble before 'Dear Hiring Manager,' if present. @@ -117,9 +119,13 @@ def _strip_preamble(text: str) -> str: # ── Core Generation ────────────────────────────────────────────────────── + def generate_cover_letter( - resume_text: str, job: dict, profile: dict, - max_retries: int = 3, validation_mode: str = "normal", + resume_text: str, + job: dict, + profile: dict, + max_retries: int = 3, + validation_mode: str = "normal", ) -> str: """Generate a cover letter with fresh context on each retry + auto-sanitize. @@ -152,17 +158,14 @@ def generate_cover_letter( # Fresh conversation every attempt prompt = cl_prompt_base if avoid_notes: - prompt += "\n\n## AVOID THESE ISSUES:\n" + "\n".join( - f"- {n}" for n in avoid_notes[-5:] - ) + prompt += "\n\n## AVOID THESE ISSUES:\n" + "\n".join(f"- {n}" for n in avoid_notes[-5:]) messages = [ {"role": "system", "content": prompt}, - {"role": "user", "content": ( - f"RESUME:\n{resume_text}\n\n---\n\n" - f"TARGET JOB:\n{job_text}\n\n" - "Write the cover letter:" - )}, + { + "role": "user", + "content": (f"RESUME:\n{resume_text}\n\n---\n\nTARGET JOB:\n{job_text}\n\nWrite the cover letter:"), + }, ] letter = client.chat(messages, max_tokens=1024, temperature=0.7) @@ -177,7 +180,9 @@ def generate_cover_letter( # Warnings never block — only hard errors trigger a retry log.debug( "Cover letter attempt %d/%d failed: %s", - attempt + 1, max_retries + 1, validation["errors"], + attempt + 1, + max_retries + 1, + validation["errors"], ) return letter # last attempt even if failed @@ -185,8 +190,8 @@ def generate_cover_letter( # ── Batch Entry Point ──────────────────────────────────────────────────── -def run_cover_letters(min_score: int = 7, limit: int = 20, - validation_mode: str = "normal") -> dict: + +def run_cover_letters(min_score: int = 7, limit: int = 20, validation_mode: str = "normal") -> dict: """Generate cover letters for high-scoring jobs that have tailored resumes. Args: @@ -198,6 +203,9 @@ def run_cover_letters(min_score: int = 7, limit: int = 20, {"generated": int, "errors": int, "elapsed": float} """ profile = load_profile() + if not RESUME_PATH.exists(): + log.error("Resume file not found: %s. Run 'applypilot init' first.", RESUME_PATH) + return {"generated": 0, "errors": 0, "elapsed": 0.0} resume_text = RESUME_PATH.read_text(encoding="utf-8") conn = get_connection() @@ -224,7 +232,8 @@ def run_cover_letters(min_score: int = 7, limit: int = 20, COVER_LETTER_DIR.mkdir(parents=True, exist_ok=True) log.info( "Generating cover letters for %d jobs (score >= %d)...", - len(jobs), min_score, + len(jobs), + min_score, ) t0 = time.time() completed = 0 @@ -234,8 +243,12 @@ def run_cover_letters(min_score: int = 7, limit: int = 20, for job in jobs: completed += 1 try: - letter = generate_cover_letter(resume_text, job, profile, - validation_mode=validation_mode) + tailored_path = job.get("tailored_resume_path") + if tailored_path and Path(tailored_path).exists(): + job_resume = Path(tailored_path).read_text(encoding="utf-8") + else: + job_resume = resume_text + letter = generate_cover_letter(job_resume, job, profile, validation_mode=validation_mode) # Build safe filename prefix safe_title = re.sub(r"[^\w\s-]", "", job["title"])[:50].strip().replace(" ", "_") @@ -249,6 +262,7 @@ def run_cover_letters(min_score: int = 7, limit: int = 20, pdf_path = None try: from applypilot.scoring.pdf import convert_to_pdf + pdf_path = str(convert_to_pdf(cl_path)) except Exception: log.debug("PDF generation failed for %s", cl_path, exc_info=True) @@ -266,12 +280,19 @@ def run_cover_letters(min_score: int = 7, limit: int = 20, rate = completed / elapsed if elapsed > 0 else 0 log.info( "%d/%d [OK] | %.1f jobs/min | %s", - completed, len(jobs), rate * 60, result["title"][:40], + completed, + len(jobs), + rate * 60, + result["title"][:40], ) except Exception as e: result = { - "url": job["url"], "title": job["title"], "site": job["site"], - "path": None, "pdf_path": None, "error": str(e), + "url": job["url"], + "title": job["title"], + "site": job["site"], + "path": None, + "pdf_path": None, + "error": str(e), } error_count += 1 results.append(result) diff --git a/src/applypilot/scoring/pdf.py b/src/applypilot/scoring/pdf.py index 2b87b67..40cf3f4 100644 --- a/src/applypilot/scoring/pdf.py +++ b/src/applypilot/scoring/pdf.py @@ -14,6 +14,7 @@ # ── Resume Parser ──────────────────────────────────────────────────────── + def parse_resume(text: str) -> dict: """Parse a structured text resume into sections. @@ -126,9 +127,7 @@ def parse_entries(text: str) -> list[dict]: if current: current["bullets"].append(stripped[2:].strip()) elif current is None or ( - not stripped.startswith("-") - and not stripped.startswith("\u2022") - and len(current.get("bullets", [])) > 0 + not stripped.startswith("-") and not stripped.startswith("\u2022") and len(current.get("bullets", [])) > 0 ): # New entry if current: @@ -148,6 +147,7 @@ def parse_entries(text: str) -> list[dict]: # ── HTML Template ──────────────────────────────────────────────────────── + def build_html(resume: dict) -> str: """Build professional resume HTML from parsed data. @@ -194,7 +194,9 @@ def build_html(resume: dict) -> str: edu_html = "" if "EDUCATION" in sections: edu_text = sections["EDUCATION"].strip() - edu_html = f'
{desc_preview}...
- {"