diff --git a/Makefile b/Makefile index 274729ac..48659bb2 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: help install install-dev test test-unit test-integration test-cov test-fast test-watch lint format typecheck security clean dev taskiq-worker taskiq-scheduler docker-build docker-run docker-test sync-kr-symbol-universe sync-upbit-symbol-universe sync-us-symbol-universe +.PHONY: help install install-dev test test-unit test-integration test-cov test-fast test-watch lint format typecheck security clean dev taskiq-worker taskiq-scheduler docker-build docker-run docker-test sync-kr-symbol-universe sync-upbit-symbol-universe sync-us-symbol-universe sync-kr-candles-backfill sync-kr-candles-incremental help: ## Show this help message @echo "Available commands:" @@ -72,6 +72,12 @@ sync-upbit-symbol-universe: ## Sync Upbit symbol universe for crypto symbol reso sync-us-symbol-universe: ## Sync US symbol universe for US symbol/exchange resolution uv run python scripts/sync_us_symbol_universe.py +sync-kr-candles-backfill: ## Backfill KR candles for recent sessions + uv run python scripts/sync_kr_candles.py --mode backfill --sessions 10 + +sync-kr-candles-incremental: ## Incremental KR candles sync (venue-gated) + uv run python scripts/sync_kr_candles.py --mode incremental + docker-build: ## Build Docker image docker build -t auto-trader . diff --git a/alembic/versions/87541fdbc954_add_kr_candles_timescale.py b/alembic/versions/87541fdbc954_add_kr_candles_timescale.py new file mode 100644 index 00000000..ed55e09a --- /dev/null +++ b/alembic/versions/87541fdbc954_add_kr_candles_timescale.py @@ -0,0 +1,211 @@ +"""add_kr_candles_timescale + +Revision ID: 87541fdbc954 +Revises: 9f2c6db7a41e +Create Date: 2026-02-23 15:14:41.031755 + +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "87541fdbc954" +down_revision: str | Sequence[str] | None = "9f2c6db7a41e" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.execute( + """ + DO $$ + DECLARE + v_extversion TEXT; + v_version_core TEXT; + v_parts TEXT[]; + v_major INTEGER; + v_minor INTEGER; + v_patch INTEGER; + BEGIN + SELECT extversion + INTO v_extversion + FROM pg_extension + WHERE extname = 'timescaledb'; + + IF v_extversion IS NULL THEN + RAISE EXCEPTION 'timescaledb extension is not installed'; + END IF; + + v_version_core := split_part(v_extversion, '-', 1); + v_parts := regexp_split_to_array(v_version_core, '\\.'); + + v_major := COALESCE(v_parts[1], '0')::INTEGER; + v_minor := COALESCE(v_parts[2], '0')::INTEGER; + v_patch := COALESCE(v_parts[3], '0')::INTEGER; + + IF (v_major, v_minor, v_patch) < (2, 8, 1) THEN + RAISE EXCEPTION + 'timescaledb extension version % is below required minimum 2.8.1', + v_extversion; + END IF; + END + $$ + """ + ) + + op.execute( + """ + CREATE TABLE public.kr_candles_1m ( + time TIMESTAMPTZ NOT NULL, + symbol TEXT NOT NULL, + venue TEXT NOT NULL, + open NUMERIC NOT NULL, + high NUMERIC NOT NULL, + low NUMERIC NOT NULL, + close NUMERIC NOT NULL, + volume NUMERIC NOT NULL, + value NUMERIC NOT NULL, + CONSTRAINT ck_kr_candles_1m_venue CHECK (venue IN ('KRX', 'NTX')), + CONSTRAINT uq_kr_candles_1m_time_symbol_venue UNIQUE (time, symbol, venue) + ) + """ + ) + + op.execute( + """ + SELECT create_hypertable( + 'public.kr_candles_1m', + 'time', + migrate_data => TRUE + ) + """ + ) + + op.execute( + """ + CREATE INDEX ix_kr_candles_1m_symbol_time_desc + ON public.kr_candles_1m (symbol, time DESC) + """ + ) + + op.execute( + """ + CREATE MATERIALIZED VIEW public.kr_candles_1h + WITH ( + timescaledb.continuous, + timescaledb.materialized_only = false + ) + AS + SELECT + time_bucket(INTERVAL '1 hour', time, 'Asia/Seoul') AS bucket, + symbol, + FIRST( + open, + ((extract(epoch from time) * 1000000)::bigint * 2 + + CASE WHEN venue = 'KRX' THEN 0 ELSE 1 END) + ) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST( + close, + ((extract(epoch from time) * 1000000)::bigint * 2 + + CASE WHEN venue = 'KRX' THEN 1 ELSE 0 END) + ) AS close, + SUM(volume) AS volume, + SUM(value) AS value, + array_agg(DISTINCT venue ORDER BY venue) AS venues + FROM public.kr_candles_1m + GROUP BY bucket, symbol + WITH NO DATA + """ + ) + + op.execute( + """ + DO $$ + BEGIN + IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN + EXECUTE $sql$ + SELECT remove_continuous_aggregate_policy( + 'public.kr_candles_1h', + if_exists => TRUE + ) + $sql$; + + EXECUTE $sql$ + SELECT add_continuous_aggregate_policy( + 'public.kr_candles_1h', + start_offset => INTERVAL '2 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '5 minutes' + ) + $sql$; + END IF; + END + $$ + """ + ) + + op.execute( + """ + DO $$ + DECLARE + v_start TIMESTAMPTZ; + v_end TIMESTAMPTZ; + v_refresh_end TIMESTAMPTZ; + BEGIN + IF to_regclass('public.kr_candles_1h') IS NULL THEN + RETURN; + END IF; + + SELECT MIN(time), MAX(time) + INTO v_start, v_end + FROM public.kr_candles_1m; + + IF v_start IS NOT NULL AND v_end IS NOT NULL THEN + v_refresh_end := LEAST( + v_end + INTERVAL '1 hour', + date_trunc('hour', now() - INTERVAL '1 hour') + ); + + IF v_refresh_end <= v_start THEN + RETURN; + END IF; + + CALL refresh_continuous_aggregate( + 'public.kr_candles_1h', + v_start, + v_refresh_end + ); + END IF; + END + $$ + """ + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.execute( + """ + DO $$ + BEGIN + IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN + EXECUTE $sql$ + SELECT remove_continuous_aggregate_policy( + 'public.kr_candles_1h', + if_exists => TRUE + ) + $sql$; + END IF; + END + $$ + """ + ) + + op.execute("DROP MATERIALIZED VIEW IF EXISTS public.kr_candles_1h") + op.execute("DROP INDEX IF EXISTS public.ix_kr_candles_1m_symbol_time_desc") + op.execute("DROP TABLE IF EXISTS public.kr_candles_1m") diff --git a/alembic/versions/d31f0a2b4c6d_add_kr_candles_retention_policy.py b/alembic/versions/d31f0a2b4c6d_add_kr_candles_retention_policy.py new file mode 100644 index 00000000..d7b65123 --- /dev/null +++ b/alembic/versions/d31f0a2b4c6d_add_kr_candles_retention_policy.py @@ -0,0 +1,64 @@ +from collections.abc import Sequence + +from alembic import op + +revision: str = "d31f0a2b4c6d" +down_revision: str | Sequence[str] | None = "87541fdbc954" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.execute( + """ + DO $$ + BEGIN + IF to_regclass('public.kr_candles_1m') IS NOT NULL THEN + PERFORM remove_retention_policy( + 'public.kr_candles_1m', + if_exists => TRUE + ); + PERFORM add_retention_policy( + 'public.kr_candles_1m', + INTERVAL '90 days' + ); + END IF; + + IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN + PERFORM remove_retention_policy( + 'public.kr_candles_1h', + if_exists => TRUE + ); + PERFORM add_retention_policy( + 'public.kr_candles_1h', + INTERVAL '90 days' + ); + END IF; + END + $$ + """ + ) + + +def downgrade() -> None: + op.execute( + """ + DO $$ + BEGIN + IF to_regclass('public.kr_candles_1m') IS NOT NULL THEN + PERFORM remove_retention_policy( + 'public.kr_candles_1m', + if_exists => TRUE + ); + END IF; + + IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN + PERFORM remove_retention_policy( + 'public.kr_candles_1h', + if_exists => TRUE + ); + END IF; + END + $$ + """ + ) diff --git a/app/jobs/kr_candles.py b/app/jobs/kr_candles.py new file mode 100644 index 00000000..5e40c5d9 --- /dev/null +++ b/app/jobs/kr_candles.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import logging + +from app.services.kr_candles_sync_service import sync_kr_candles + +logger = logging.getLogger(__name__) + + +async def run_kr_candles_sync( + *, + mode: str, + sessions: int = 10, + user_id: int = 1, +) -> dict[str, object]: + try: + result = await sync_kr_candles(mode=mode, sessions=sessions, user_id=user_id) + return { + "status": "completed", + **result, + } + except Exception as exc: + logger.error("KR candles sync failed: %s", exc, exc_info=True) + return { + "status": "failed", + "mode": mode, + "error": str(exc), + } diff --git a/app/mcp_server/README.md b/app/mcp_server/README.md index 9bd1af88..eb802b30 100644 --- a/app/mcp_server/README.md +++ b/app/mcp_server/README.md @@ -29,14 +29,19 @@ MCP tools (market data, portfolio, order execution) exposed via `fastmcp`. - `4h`: crypto only - `1h`: KR/US equity + crypto - US OHLCV remains Yahoo-based (`app.services.brokers.yahoo.client.fetch_ohlcv`) - - KR `1h` includes in-progress (partial) hourly candle - - KR `1h` resolves symbol route from `kr_symbol_universe` (`nxt_eligible=true` -> `UN`, else `J`) - - KR `1h` returns an explicit error when symbol is missing/inactive in `kr_symbol_universe` - - KR `1h` keeps `UN` empty-result behavior without `J` fallback - - KR `1h` expands intraday coverage with same-day `end_time` cursor pagination before moving to prior dates - - KR `1h` same-day pagination is capped at `10` internal API calls per trading day - - KR `1h` historical backfill uses route-specific cutoff (`J=153000`, `UN=200000`) - - KR `1h` prerequisite: run `make sync-kr-symbol-universe` (or `uv run python scripts/sync_kr_symbol_universe.py`) right after migrations + - KR `1h` history is DB-first from Timescale continuous aggregate `public.kr_candles_1h` + - KR `1h` includes the in-progress (partial) hourly candle by rebuilding the current hour in-memory from: + - `public.kr_candles_1m` (minute DB) + KIS minute API (up to 30 rows) + - KIS minute venues are merged with strict dedup to prevent double-counting (API overwrites DB per minute+venue) + - KIS minute API call plan (KST): + - `09:00 <= now < 15:35`: call KRX (`J`) + NTX (`NX`) in parallel when `nxt_eligible=true` (15:35 delay defense) + - `08:00 <= now < 09:00`: call NTX (`NX`) only when `nxt_eligible=true` + - `15:35 <= now < 20:00`: call NTX (`NX`) only when `nxt_eligible=true` + - When `end_date` is in the past: DB-only (0 API calls) + - KR `1h` returns an explicit error when symbol is missing/inactive in `kr_symbol_universe` (used for `nxt_eligible`) + - KR `1h` does not use Redis OHLCV cache (`kis_ohlcv_cache`) + - KR `1h` treats partial API failure (KRX/NTX) as a tool-level error (exception) + - KR `1h` response rows add `session` and `venues` fields (KR `1h` only; other market/period schemas unchanged) - `get_indicators(symbol, indicators, market=None)` - `get_volume_profile(symbol, market=None, period=60, bins=20)` - `get_order_history(symbol=None, status="all", order_id=None, limit=50)` diff --git a/app/mcp_server/tooling/market_data_quotes.py b/app/mcp_server/tooling/market_data_quotes.py index 114c1845..54032939 100644 --- a/app/mcp_server/tooling/market_data_quotes.py +++ b/app/mcp_server/tooling/market_data_quotes.py @@ -8,16 +8,12 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from zoneinfo import ZoneInfo -import pandas as pd -from sqlalchemy import select - import app.services.brokers.upbit.client as upbit_service import app.services.brokers.yahoo.client as yahoo_service from app.core.config import settings -from app.core.db import AsyncSessionLocal from app.mcp_server.tooling.market_data_indicators import ( IndicatorType, _compute_crypto_realtime_rsi_from_frame, @@ -42,9 +38,9 @@ from app.mcp_server.tooling.shared import ( resolve_market_type as _resolve_market_type, ) -from app.models.kr_symbol_universe import KRSymbolUniverse from app.services import kis_ohlcv_cache from app.services.brokers.kis.client import KISClient +from app.services.kr_hourly_candles_read_service import read_kr_hourly_candles_1h from app.services.kr_symbol_universe_service import search_kr_symbols from app.services.upbit_symbol_universe_service import search_upbit_symbols from app.services.us_symbol_universe_service import search_us_symbols @@ -143,8 +139,12 @@ async def _fetch_quote_equity_us(symbol: str) -> dict[str, Any]: f"Yahoo quote fetch failed for '{normalized_symbol}': {exc}" ) from exc + close_raw = fast_info.get("close") + if close_raw is None: + raise ValueError(not_found_message) from None + try: - price = float(fast_info.get("close")) + price = float(close_raw) except (TypeError, ValueError): raise ValueError(not_found_message) from None @@ -222,131 +222,6 @@ async def _fetch_ohlcv_crypto( _KST = ZoneInfo("Asia/Seoul") -_KR_ROUTE_CLOSE = { - "J": "153000", - "UN": "200000", -} -_KR_ROUTE_START = { - "J": "090000", - "UN": "080000", -} -_KR_INTRADAY_MAX_PAGE_CALLS_PER_DAY = 10 -_KR_UNIVERSE_SYNC_COMMAND = "uv run python scripts/sync_kr_symbol_universe.py" - - -def _kr_universe_sync_hint() -> str: - return f"Sync required: {_KR_UNIVERSE_SYNC_COMMAND}" - - -async def _resolve_kr_intraday_route(symbol: str) -> str: - normalized_symbol = str(symbol or "").strip().upper() - async with AsyncSessionLocal() as db: - query = select(KRSymbolUniverse).where( - KRSymbolUniverse.symbol == normalized_symbol - ) - result = await db.execute(query) - universe = result.scalar_one_or_none() - if universe is None: - async with AsyncSessionLocal() as db: - has_any_rows_result = await db.execute( - select(KRSymbolUniverse.symbol).limit(1) - ) - has_any_rows = has_any_rows_result.scalar_one_or_none() - if has_any_rows is None: - raise ValueError(f"kr_symbol_universe is empty. {_kr_universe_sync_hint()}") - raise ValueError( - f"KR symbol '{normalized_symbol}' is not registered in kr_symbol_universe. " - f"{_kr_universe_sync_hint()}" - ) - if not universe.is_active: - raise ValueError( - f"KR symbol '{normalized_symbol}' is inactive in kr_symbol_universe. " - f"{_kr_universe_sync_hint()}" - ) - return "UN" if universe.nxt_eligible else "J" - - -def _resolve_kr_intraday_end_time(route_market: str, target_day: datetime.date) -> str: - session_close = _KR_ROUTE_CLOSE[route_market] - now_kst = datetime.datetime.now(_KST) - if target_day < now_kst.date(): - return session_close - now_hhmmss = now_kst.strftime("%H%M%S") - return min(now_hhmmss, session_close) - - -def _filter_kr_intraday_session( - frame: pd.DataFrame, - route_market: str, -) -> pd.DataFrame: - if frame.empty or "datetime" not in frame.columns: - return frame - out = frame.copy() - out["datetime"] = pd.to_datetime(out["datetime"], errors="coerce") - out = out.dropna(subset=["datetime"]) - if out.empty: - return out - start = _KR_ROUTE_START[route_market] - end = _KR_ROUTE_CLOSE[route_market] - hhmmss = out["datetime"].dt.strftime("%H%M%S") - return out.loc[(hhmmss >= start) & (hhmmss <= end)].reset_index(drop=True) - - -async def _page_kr_intraday_day( - kis: KISClient, - symbol: str, - route_market: str, - target_day: datetime.date, - initial_end_time: str, - max_page_calls: int = _KR_INTRADAY_MAX_PAGE_CALLS_PER_DAY, -) -> pd.DataFrame: - session_start = _KR_ROUTE_START[route_market] - page_limit = max(int(max_page_calls), 1) - end_time = initial_end_time - merged = pd.DataFrame() - - for _ in range(page_limit): - intraday = await kis.inquire_time_dailychartprice( - code=symbol, - market=route_market, - n=200, - end_date=target_day, - end_time=end_time, - ) - intraday = _filter_kr_intraday_session(intraday, route_market) - if intraday.empty: - break - - merged = pd.concat([merged, intraday], ignore_index=True) - if "datetime" in merged.columns: - merged["datetime"] = pd.to_datetime(merged["datetime"], errors="coerce") - merged = merged.dropna(subset=["datetime"]) - merged = ( - merged.drop_duplicates(subset=["datetime"], keep="last") - .sort_values("datetime") - .reset_index(drop=True) - ) - else: - merged = merged.drop_duplicates().reset_index(drop=True) - - if "datetime" not in intraday.columns: - break - - intraday_datetimes = pd.to_datetime(intraday["datetime"], errors="coerce") - intraday_datetimes = intraday_datetimes.dropna() - if intraday_datetimes.empty: - break - - oldest = intraday_datetimes.min() - next_end_time = (oldest - datetime.timedelta(minutes=1)).strftime("%H%M%S") - if next_end_time < session_start: - break - if next_end_time == end_time: - break - - end_time = next_end_time - - return merged async def _fetch_ohlcv_equity_kr( @@ -381,61 +256,11 @@ async def _raw_fetch_day(requested_count: int): else: df = await _raw_fetch_day(capped_count) elif period == "1h": - route_market = await _resolve_kr_intraday_route(symbol) - - async def _raw_fetch_1h(requested_count: int): - aggregate_fn = getattr(KISClient, "_aggregate_intraday_to_hour", None) - target_count = max(int(requested_count), 1) - current_day = ( - end_date.date() if end_date else datetime.datetime.now(_KST).date() - ) - estimated_days = (target_count + 5) // 6 - max_fetch_days = min(max(estimated_days + 3, 3), 120) - - merged = pd.DataFrame() - for _ in range(max_fetch_days): - end_time = _resolve_kr_intraday_end_time(route_market, current_day) - intraday = await _page_kr_intraday_day( - kis=kis, - symbol=symbol, - route_market=route_market, - target_day=current_day, - initial_end_time=end_time, - max_page_calls=_KR_INTRADAY_MAX_PAGE_CALLS_PER_DAY, - ) - if callable(aggregate_fn): - hourly = aggregate_fn(intraday) - else: - hourly = intraday - - if not hourly.empty: - merged = pd.concat([merged, hourly], ignore_index=True) - if "datetime" in merged.columns: - merged = ( - merged.drop_duplicates(subset=["datetime"], keep="last") - .sort_values("datetime") - .reset_index(drop=True) - ) - else: - merged = merged.drop_duplicates().reset_index(drop=True) - if len(merged) >= target_count: - break - - current_day = current_day - datetime.timedelta(days=1) - - return merged.tail(target_count).reset_index(drop=True) - - use_cache = end_date is None and settings.kis_ohlcv_cache_enabled - if use_cache: - df = await kis_ohlcv_cache.get_candles( - symbol=symbol, - count=capped_count, - period="1h", - raw_fetcher=_raw_fetch_1h, - route=route_market, - ) - else: - df = await _raw_fetch_1h(capped_count) + df = await read_kr_hourly_candles_1h( + symbol=symbol, + count=capped_count, + end_date=end_date, + ) else: kis_period_map = {"week": "W", "month": "M"} df = await kis.inquire_daily_itemchartprice( @@ -640,7 +465,7 @@ async def _get_indicators_impl( raise ValueError( f"Invalid indicator '{ind}'. Valid options: {', '.join(sorted(valid_indicators))}" ) - normalized_indicators.append(ind_lower) + normalized_indicators.append(cast(IndicatorType, ind_lower)) market_type, symbol = _resolve_market_type(normalized_symbol, market) diff --git a/app/services/kr_candles_sync_service.py b/app/services/kr_candles_sync_service.py new file mode 100644 index 00000000..28f9e79f --- /dev/null +++ b/app/services/kr_candles_sync_service.py @@ -0,0 +1,700 @@ +from __future__ import annotations + +import logging +from collections.abc import Sequence +from dataclasses import dataclass +from datetime import UTC, date, datetime, time, timedelta +from functools import lru_cache +from typing import Literal, cast +from zoneinfo import ZoneInfo + +import exchange_calendars as xcals +import pandas as pd +from sqlalchemy import select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.db import AsyncSessionLocal +from app.models.kr_symbol_universe import KRSymbolUniverse +from app.models.manual_holdings import MarketType +from app.services.brokers.kis.client import KISClient +from app.services.manual_holdings_service import ManualHoldingsService + +logger = logging.getLogger(__name__) + +_KST = ZoneInfo("Asia/Seoul") +_OVERLAP_MINUTES = 5 +_DEFAULT_BOOTSTRAP_SESSIONS = 10 +_MAX_PAGE_CALLS_PER_DAY = 30 + + +@dataclass(frozen=True, slots=True) +class VenueConfig: + venue: str + market_code: str + session_start: time + session_end: time + + +@dataclass(frozen=True, slots=True) +class MinuteCandleRow: + time_utc: datetime + local_time: datetime + symbol: str + venue: str + open: float + high: float + low: float + close: float + volume: float + value: float + + +_VENUE_CONFIG: dict[str, VenueConfig] = { + "KRX": VenueConfig( + venue="KRX", + market_code="J", + session_start=time(9, 0, 0), + session_end=time(15, 30, 0), + ), + "NTX": VenueConfig( + venue="NTX", + market_code="NX", + session_start=time(8, 0, 0), + session_end=time(20, 0, 0), + ), +} + +_CURSOR_SQL = text( + """ + SELECT MAX(time) + FROM public.kr_candles_1m + WHERE symbol = :symbol + AND venue = :venue + """ +) + +_UPSERT_SQL = text( + """ + INSERT INTO public.kr_candles_1m + (time, symbol, venue, open, high, low, close, volume, value) + VALUES + (:time, :symbol, :venue, :open, :high, :low, :close, :volume, :value) + ON CONFLICT (time, symbol, venue) + DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + value = EXCLUDED.value + WHERE + kr_candles_1m.open IS DISTINCT FROM EXCLUDED.open + OR kr_candles_1m.high IS DISTINCT FROM EXCLUDED.high + OR kr_candles_1m.low IS DISTINCT FROM EXCLUDED.low + OR kr_candles_1m.close IS DISTINCT FROM EXCLUDED.close + OR kr_candles_1m.volume IS DISTINCT FROM EXCLUDED.volume + OR kr_candles_1m.value IS DISTINCT FROM EXCLUDED.value + """ +) + + +@lru_cache(maxsize=1) +def _get_xkrx_calendar(): + return xcals.get_calendar("XKRX") + + +def _normalize_mode(mode: str) -> Literal["incremental", "backfill"]: + normalized = str(mode or "").strip().lower() + if normalized not in {"incremental", "backfill"}: + raise ValueError("mode must be 'incremental' or 'backfill'") + return cast(Literal["incremental", "backfill"], normalized) + + +def _normalize_symbol(value: object) -> str | None: + text_value = str(value or "").strip().upper() + if not text_value: + return None + if len(text_value) < 6: + text_value = text_value.zfill(6) + if len(text_value) == 6 and text_value.isalnum(): + return text_value + return None + + +def _parse_float(value: object) -> float | None: + try: + if value is None: + return None + return float(str(value)) + except (TypeError, ValueError): + return None + + +def _build_symbol_union( + kis_holdings: Sequence[object], + manual_holdings: Sequence[object], +) -> set[str]: + symbols: set[str] = set() + + for item in kis_holdings: + if isinstance(item, dict): + raw_symbol = cast(object | None, item.get("pdno")) + else: + raw_symbol = getattr(item, "pdno", None) + symbol = _normalize_symbol(raw_symbol) + if symbol is not None: + symbols.add(symbol) + + for holding in manual_holdings: + ticker = getattr(holding, "ticker", None) + symbol = _normalize_symbol(ticker) + if symbol is not None: + symbols.add(symbol) + + return symbols + + +def _validate_universe_rows( + *, + target_symbols: set[str], + universe_rows: list[KRSymbolUniverse], + table_has_rows: bool, +) -> dict[str, KRSymbolUniverse]: + if not table_has_rows: + raise ValueError("kr_symbol_universe is empty") + + rows_by_symbol = {row.symbol: row for row in universe_rows} + missing = sorted(target_symbols - set(rows_by_symbol)) + if missing: + preview = ", ".join(missing[:10]) + raise ValueError( + f"KR symbol is not registered in kr_symbol_universe: " + f"count={len(missing)} symbols=[{preview}]" + ) + + inactive = sorted( + symbol + for symbol in target_symbols + if symbol in rows_by_symbol and not rows_by_symbol[symbol].is_active + ) + if inactive: + preview = ", ".join(inactive[:10]) + raise ValueError( + f"KR symbol is inactive in kr_symbol_universe: " + f"count={len(inactive)} symbols=[{preview}]" + ) + + return {symbol: rows_by_symbol[symbol] for symbol in target_symbols} + + +def _build_venue_plan( + rows_by_symbol: dict[str, KRSymbolUniverse], +) -> dict[str, list[VenueConfig]]: + plan: dict[str, list[VenueConfig]] = {} + for symbol in sorted(rows_by_symbol): + row = rows_by_symbol[symbol] + if row.nxt_eligible: + plan[symbol] = [_VENUE_CONFIG["KRX"], _VENUE_CONFIG["NTX"]] + else: + plan[symbol] = [_VENUE_CONFIG["KRX"]] + return plan + + +def _is_session_day_kst(target_day: date) -> bool: + calendar = _get_xkrx_calendar() + return bool(calendar.is_session(pd.Timestamp(target_day))) + + +def _should_process_venue( + *, + mode: Literal["incremental", "backfill"], + now_kst: datetime, + is_session_day: bool, + venue: VenueConfig, +) -> tuple[bool, str | None]: + if mode == "backfill": + return True, None + + if not is_session_day: + return False, "holiday" + + now_clock = time(now_kst.hour, now_kst.minute, now_kst.second) + if now_clock < venue.session_start or now_clock > venue.session_end: + return False, "outside_session" + + return True, None + + +def _compute_incremental_cutoff_kst(cursor_utc: datetime | None) -> datetime | None: + if cursor_utc is None: + return None + + if cursor_utc.tzinfo is None: + normalized_cursor = cursor_utc.replace(tzinfo=UTC) + else: + normalized_cursor = cursor_utc.astimezone(UTC) + + return normalized_cursor.astimezone(_KST) - timedelta(minutes=_OVERLAP_MINUTES) + + +def _convert_kis_datetime_to_utc(value: datetime) -> datetime: + if value.tzinfo is None: + localized = value.replace(tzinfo=_KST) + else: + localized = value.astimezone(_KST) + return localized.astimezone(UTC) + + +def _recent_session_days( + now_kst: datetime, + sessions: int, + *, + include_today: bool, +) -> list[date]: + calendar = _get_xkrx_calendar() + lookback_days = max(90, sessions * 8) + start = pd.Timestamp(now_kst.date() - timedelta(days=lookback_days)) + end = pd.Timestamp(now_kst.date()) + session_index = calendar.sessions_in_range(start, end) + days = [pd.Timestamp(value).date() for value in session_index] + if not include_today and days and days[-1] == now_kst.date(): + days = days[:-1] + if not days: + return [] + return days[-sessions:] + + +def _day_before_cutoff( + *, + target_day: date, + venue: VenueConfig, + cutoff_kst: datetime | None, +) -> bool: + if cutoff_kst is None: + return False + day_end = datetime.combine(target_day, venue.session_end, tzinfo=_KST) + return day_end < cutoff_kst + + +def _initial_end_time(now_kst: datetime, target_day: date, venue: VenueConfig) -> str: + close_hhmmss = venue.session_end.strftime("%H%M%S") + if target_day < now_kst.date(): + return close_hhmmss + now_hhmmss = now_kst.strftime("%H%M%S") + return min(now_hhmmss, close_hhmmss) + + +def _normalize_intraday_rows( + *, + frame: pd.DataFrame, + symbol: str, + venue: VenueConfig, + target_day: date, +) -> list[MinuteCandleRow]: + if frame.empty: + return [] + + rows: list[MinuteCandleRow] = [] + for item in frame.to_dict("records"): + raw_datetime = item.get("datetime") + if raw_datetime is None: + continue + + parsed = pd.to_datetime(str(raw_datetime), errors="coerce") + if pd.isna(parsed): + continue + + parsed_dt = parsed.to_pydatetime() + if parsed_dt.tzinfo is None: + local_dt = parsed_dt.replace(tzinfo=_KST) + else: + local_dt = parsed_dt.astimezone(_KST) + local_dt = local_dt.replace(second=0, microsecond=0) + + if local_dt.date() != target_day: + continue + + local_clock = time(local_dt.hour, local_dt.minute, local_dt.second) + if local_clock < venue.session_start or local_clock > venue.session_end: + continue + + open_value = _parse_float(item.get("open")) + high_value = _parse_float(item.get("high")) + low_value = _parse_float(item.get("low")) + close_value = _parse_float(item.get("close")) + volume_value = _parse_float(item.get("volume")) + value_value = _parse_float(item.get("value")) + + if ( + open_value is None + or high_value is None + or low_value is None + or close_value is None + or volume_value is None + or value_value is None + ): + continue + + rows.append( + MinuteCandleRow( + time_utc=_convert_kis_datetime_to_utc(local_dt), + local_time=local_dt, + symbol=symbol, + venue=venue.venue, + open=float(open_value), + high=float(high_value), + low=float(low_value), + close=float(close_value), + volume=float(volume_value), + value=float(value_value), + ) + ) + + deduped: dict[datetime, MinuteCandleRow] = {} + for row in rows: + deduped[row.time_utc] = row + return [deduped[key] for key in sorted(deduped)] + + +async def _read_cursor_utc( + session: AsyncSession, + *, + symbol: str, + venue: str, +) -> datetime | None: + result = await session.execute(_CURSOR_SQL, {"symbol": symbol, "venue": venue}) + value = result.scalar_one_or_none() + if isinstance(value, datetime): + return value + return None + + +async def _upsert_rows(session: AsyncSession, rows: list[MinuteCandleRow]) -> int: + if not rows: + return 0 + + payload = [ + { + "time": row.time_utc, + "symbol": row.symbol, + "venue": row.venue, + "open": row.open, + "high": row.high, + "low": row.low, + "close": row.close, + "volume": row.volume, + "value": row.value, + } + for row in rows + ] + _ = await session.execute(_UPSERT_SQL, payload) + return len(payload) + + +async def _collect_day_rows( + *, + kis: KISClient, + symbol: str, + venue: VenueConfig, + target_day: date, + initial_end_time: str, + cutoff_kst: datetime | None, +) -> tuple[list[MinuteCandleRow], int, bool, bool]: + merged: dict[datetime, MinuteCandleRow] = {} + end_time = initial_end_time + page_calls = 0 + reached_cutoff = False + + for _ in range(_MAX_PAGE_CALLS_PER_DAY): + page_calls += 1 + frame = await kis.inquire_time_dailychartprice( + code=symbol, + market=venue.market_code, + n=200, + end_date=target_day, + end_time=end_time, + ) + if frame.empty: + ordered = [merged[key] for key in sorted(merged)] + return ordered, page_calls, reached_cutoff, True + + page_rows = _normalize_intraday_rows( + frame=frame, + symbol=symbol, + venue=venue, + target_day=target_day, + ) + if not page_rows: + ordered = [merged[key] for key in sorted(merged)] + return ordered, page_calls, reached_cutoff, True + + earliest_local = min(row.local_time for row in page_rows) + + for row in page_rows: + if cutoff_kst is not None and row.local_time < cutoff_kst: + reached_cutoff = True + continue + merged[row.time_utc] = row + + next_cursor = earliest_local - timedelta(minutes=1) + if cutoff_kst is not None and next_cursor < cutoff_kst: + reached_cutoff = True + + if reached_cutoff: + break + + if next_cursor.date() != target_day: + break + + next_clock = time(next_cursor.hour, next_cursor.minute, next_cursor.second) + if next_clock < venue.session_start: + break + + next_end_time = next_cursor.strftime("%H%M%S") + if next_end_time == end_time: + break + end_time = next_end_time + + ordered = [merged[key] for key in sorted(merged)] + return ordered, page_calls, reached_cutoff, False + + +async def _sync_symbol_venue( + *, + session: AsyncSession, + kis: KISClient, + symbol: str, + venue: VenueConfig, + mode: Literal["incremental", "backfill"], + now_kst: datetime, + backfill_days: list[date] | None, +) -> dict[str, int | bool | str]: + cursor_utc = await _read_cursor_utc(session, symbol=symbol, venue=venue.venue) + cutoff_kst = _compute_incremental_cutoff_kst(cursor_utc) + + if mode == "backfill": + if not backfill_days: + return { + "rows_upserted": 0, + "days_processed": 0, + "pages_fetched": 0, + "empty_response": True, + } + earliest_day = backfill_days[0] + cutoff_kst = datetime.combine(earliest_day, venue.session_start, tzinfo=_KST) + allowed_days: set[date] | None = set(backfill_days) + else: + if cutoff_kst is None: + bootstrap_days = _recent_session_days( + now_kst, + _DEFAULT_BOOTSTRAP_SESSIONS, + include_today=True, + ) + if bootstrap_days: + cutoff_kst = datetime.combine( + bootstrap_days[0], + venue.session_start, + tzinfo=_KST, + ) + allowed_days = None + + if cutoff_kst is not None and cutoff_kst > now_kst: + cutoff_kst = now_kst + + rows_upserted = 0 + pages_fetched = 0 + days_processed = 0 + saw_empty_response = False + current_day = now_kst.date() + + while True: + if _day_before_cutoff( + target_day=current_day, venue=venue, cutoff_kst=cutoff_kst + ): + break + + if allowed_days is not None: + if current_day < min(allowed_days): + break + if current_day not in allowed_days: + current_day = current_day - timedelta(days=1) + continue + + if not _is_session_day_kst(current_day): + current_day = current_day - timedelta(days=1) + continue + + initial_end_time = _initial_end_time(now_kst, current_day, venue) + day_rows, page_calls, reached_cutoff, empty_response = await _collect_day_rows( + kis=kis, + symbol=symbol, + venue=venue, + target_day=current_day, + initial_end_time=initial_end_time, + cutoff_kst=cutoff_kst, + ) + pages_fetched += page_calls + days_processed += 1 + + if day_rows: + rows_upserted += await _upsert_rows(session, day_rows) + elif empty_response: + saw_empty_response = True + logger.warning( + "KR candles sync empty response symbol=%s venue=%s day=%s end_time=%s", + symbol, + venue.venue, + current_day.isoformat(), + initial_end_time, + ) + + if reached_cutoff: + break + + if empty_response: + current_day = current_day - timedelta(days=1) + continue + + current_day = current_day - timedelta(days=1) + + return { + "rows_upserted": rows_upserted, + "days_processed": days_processed, + "pages_fetched": pages_fetched, + "empty_response": saw_empty_response, + } + + +async def _load_universe_context( + session: AsyncSession, + target_symbols: set[str], +) -> tuple[list[KRSymbolUniverse], bool]: + has_rows_result = await session.execute(select(KRSymbolUniverse.symbol).limit(1)) + table_has_rows = has_rows_result.scalar_one_or_none() is not None + + if not target_symbols: + return [], table_has_rows + + result = await session.execute( + select(KRSymbolUniverse).where(KRSymbolUniverse.symbol.in_(target_symbols)) + ) + rows = list(result.scalars().all()) + return rows, table_has_rows + + +async def sync_kr_candles( + *, + mode: str, + sessions: int = 10, + user_id: int = 1, +) -> dict[str, object]: + normalized_mode = _normalize_mode(mode) + session_count = max(int(sessions), 1) + now_kst = datetime.now(_KST) + session_day_today = _is_session_day_kst(now_kst.date()) + + kis = KISClient() + + session = cast(AsyncSession, cast(object, AsyncSessionLocal())) + try: + kis_holdings = await kis.fetch_my_stocks() + manual_service = ManualHoldingsService(session) + manual_holdings = await manual_service.get_holdings_by_user( + user_id=user_id, + market_type=MarketType.KR, + ) + target_symbols = _build_symbol_union(kis_holdings, manual_holdings) + if not target_symbols: + return { + "mode": normalized_mode, + "sessions": session_count, + "skipped": True, + "reason": "no_target_symbols", + "symbols_total": 0, + "symbol_venues_total": 0, + "pairs_processed": 0, + "pairs_skipped": 0, + "rows_upserted": 0, + "pages_fetched": 0, + } + + universe_rows, table_has_rows = await _load_universe_context( + session, + target_symbols, + ) + rows_by_symbol = _validate_universe_rows( + target_symbols=target_symbols, + universe_rows=universe_rows, + table_has_rows=table_has_rows, + ) + venue_plan = _build_venue_plan(rows_by_symbol) + + backfill_days: list[date] | None = None + if normalized_mode == "backfill": + include_today = now_kst.time() >= _VENUE_CONFIG["KRX"].session_end + backfill_days = _recent_session_days( + now_kst, + session_count, + include_today=include_today, + ) + + pairs_total = sum(len(venues) for venues in venue_plan.values()) + pairs_processed = 0 + pairs_skipped = 0 + rows_upserted = 0 + pages_fetched = 0 + skipped_reasons: dict[str, int] = {} + + for symbol, venues in venue_plan.items(): + for venue in venues: + should_process, skip_reason = _should_process_venue( + mode=normalized_mode, + now_kst=now_kst, + is_session_day=session_day_today, + venue=venue, + ) + if not should_process: + pairs_skipped += 1 + if skip_reason is not None: + skipped_reasons[skip_reason] = ( + skipped_reasons.get(skip_reason, 0) + 1 + ) + continue + + try: + stats = await _sync_symbol_venue( + session=session, + kis=kis, + symbol=symbol, + venue=venue, + mode=normalized_mode, + now_kst=now_kst, + backfill_days=backfill_days, + ) + await session.commit() + except Exception: + await session.rollback() + raise + + pairs_processed += 1 + rows_upserted += int(stats["rows_upserted"]) + pages_fetched += int(stats["pages_fetched"]) + + skipped = pairs_processed == 0 + return { + "mode": normalized_mode, + "sessions": session_count, + "skipped": skipped, + "skip_reasons": skipped_reasons, + "symbols_total": len(target_symbols), + "symbol_venues_total": pairs_total, + "pairs_processed": pairs_processed, + "pairs_skipped": pairs_skipped, + "rows_upserted": rows_upserted, + "pages_fetched": pages_fetched, + } + finally: + await session.close() + + +__all__ = ["sync_kr_candles"] diff --git a/app/services/kr_hourly_candles_read_service.py b/app/services/kr_hourly_candles_read_service.py new file mode 100644 index 00000000..99372ffc --- /dev/null +++ b/app/services/kr_hourly_candles_read_service.py @@ -0,0 +1,547 @@ +from __future__ import annotations + +import asyncio +import datetime +from dataclasses import dataclass +from typing import Literal, cast +from zoneinfo import ZoneInfo + +import pandas as pd +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.db import AsyncSessionLocal +from app.services.brokers.kis.client import KISClient + +_KST = ZoneInfo("Asia/Seoul") + +_KR_UNIVERSE_SYNC_COMMAND = "uv run python scripts/sync_kr_symbol_universe.py" + + +def _kr_universe_sync_hint() -> str: + return f"Sync required: {_KR_UNIVERSE_SYNC_COMMAND}" + + +SessionType = Literal["PRE_MARKET", "REGULAR", "AFTER_MARKET"] +VenueType = Literal["KRX", "NTX"] + + +def _async_session() -> AsyncSession: + return cast(AsyncSession, cast(object, AsyncSessionLocal())) + + +@dataclass(frozen=True, slots=True) +class _UniverseRow: + symbol: str + nxt_eligible: bool + is_active: bool + + +@dataclass(frozen=True, slots=True) +class _MinuteRow: + minute_time: datetime.datetime + venue: VenueType + open: float + high: float + low: float + close: float + volume: float + value: float + + +_KR_UNIVERSE_HAS_ANY_ROWS_SQL = text( + """ + SELECT symbol + FROM public.kr_symbol_universe + LIMIT 1 + """ +) + +_KR_UNIVERSE_ROW_SQL = text( + """ + SELECT symbol, nxt_eligible, is_active + FROM public.kr_symbol_universe + WHERE symbol = :symbol + """ +) + +_KR_HOURLY_SQL = text( + """ + SELECT bucket, open, high, low, close, volume, value, venues + FROM public.kr_candles_1h + WHERE symbol = :symbol + AND bucket <= :end_time + ORDER BY bucket DESC + LIMIT :limit + """ +) + +_KR_MINUTE_SQL = text( + """ + SELECT time, venue, open, high, low, close, volume, value + FROM public.kr_candles_1m + WHERE symbol = :symbol + AND time >= :start_time + AND time < :end_time + """ +) + + +def _ensure_kst_aware(value: datetime.datetime) -> datetime.datetime: + if value.tzinfo is None: + return value.replace(tzinfo=_KST) + return value.astimezone(_KST) + + +def _to_kst_naive(value: datetime.datetime) -> datetime.datetime: + return _ensure_kst_aware(value).replace(tzinfo=None) + + +def _to_float(value: object) -> float: + if value is None: + return 0.0 + if isinstance(value, (int, float)): + return float(value) + return float(str(value)) + + +def _to_venue(value: object) -> VenueType: + text_value = str(value or "").strip().upper() + if text_value == "KRX": + return "KRX" + if text_value == "NTX": + return "NTX" + raise ValueError(f"Unexpected KR venue: {value}") + + +def _normalize_venues(value: object) -> list[str]: + venues: list[str] = [] + if value is None: + return venues + if isinstance(value, (list, tuple)): + venues = [str(v).strip().upper() for v in value if str(v).strip()] + else: + venues = [str(value).strip().upper()] + order = {"KRX": 0, "NTX": 1} + venues = [v for v in venues if v in order] + venues.sort(key=lambda v: order[v]) + return venues + + +def _session_for_bucket_start( + bucket_start_kst_naive: datetime.datetime, +) -> SessionType | None: + t = bucket_start_kst_naive.time() + if datetime.time(8, 0, 0) <= t < datetime.time(9, 0, 0): + return "PRE_MARKET" + if datetime.time(9, 0, 0) <= t < datetime.time(15, 30, 0): + return "REGULAR" + if datetime.time(15, 30, 0) <= t <= datetime.time(20, 0, 0): + return "AFTER_MARKET" + return None + + +def _should_call_api( + *, now_kst: datetime.datetime, end_date: datetime.datetime | None +) -> bool: + if end_date is not None: + end_day = ( + _ensure_kst_aware(end_date).date() if end_date.tzinfo else end_date.date() + ) + if end_day < now_kst.date(): + return False + + now_clock = now_kst.time() + if now_clock < datetime.time(8, 0, 0): + return False + if now_clock >= datetime.time(20, 0, 0): + return False + return True + + +def _api_markets_for_now( + *, + now_kst: datetime.datetime, + nxt_eligible: bool, + end_date: datetime.datetime | None, +) -> list[str]: + if not _should_call_api(now_kst=now_kst, end_date=end_date): + return [] + + now_clock = now_kst.time() + if datetime.time(8, 0, 0) <= now_clock < datetime.time(9, 0, 0): + return ["NX"] if nxt_eligible else [] + if datetime.time(9, 0, 0) <= now_clock < datetime.time(15, 35, 0): + return ["J", "NX"] if nxt_eligible else ["J"] + if datetime.time(15, 35, 0) <= now_clock < datetime.time(20, 0, 0): + return ["NX"] if nxt_eligible else [] + return [] + + +async def _resolve_universe_row( + symbol: str, +) -> _UniverseRow: + normalized_symbol = str(symbol or "").strip().upper() + async with _async_session() as session: + has_any_rows = ( + await session.execute(_KR_UNIVERSE_HAS_ANY_ROWS_SQL) + ).scalar_one_or_none() + result = await session.execute( + _KR_UNIVERSE_ROW_SQL, + {"symbol": normalized_symbol}, + ) + rows = list(result.mappings().all()) + + if not rows: + if has_any_rows is None: + raise ValueError(f"kr_symbol_universe is empty. {_kr_universe_sync_hint()}") + raise ValueError( + f"KR symbol '{normalized_symbol}' is not registered in kr_symbol_universe. " + f"{_kr_universe_sync_hint()}" + ) + + row = rows[0] + is_active = bool(row.get("is_active")) + if not is_active: + raise ValueError( + f"KR symbol '{normalized_symbol}' is inactive in kr_symbol_universe. " + f"{_kr_universe_sync_hint()}" + ) + + return _UniverseRow( + symbol=normalized_symbol, + nxt_eligible=bool(row.get("nxt_eligible")), + is_active=is_active, + ) + + +async def _fetch_hour_rows( + *, + symbol: str, + end_time_kst: datetime.datetime, + limit: int, +) -> list[dict[str, object]]: + async with _async_session() as session: + result = await session.execute( + _KR_HOURLY_SQL, + { + "symbol": symbol, + "end_time": end_time_kst, + "limit": int(limit), + }, + ) + return [{str(k): v for k, v in row.items()} for row in result.mappings().all()] + + +async def _fetch_minute_rows( + *, + symbol: str, + start_time_kst: datetime.datetime, + end_time_kst: datetime.datetime, +) -> list[dict[str, object]]: + async with _async_session() as session: + result = await session.execute( + _KR_MINUTE_SQL, + { + "symbol": symbol, + "start_time": start_time_kst, + "end_time": end_time_kst, + }, + ) + return [{str(k): v for k, v in row.items()} for row in result.mappings().all()] + + +def _build_hour_frame( + *, + hour_rows: list[dict[str, object]], + current_hour_row: dict[str, object] | None, + count: int, + current_bucket_start: datetime.datetime | None, +) -> pd.DataFrame: + rows: list[dict[str, object]] = [] + drop_bucket = current_bucket_start + + for row in hour_rows: + bucket_raw = row.get("bucket") + if not isinstance(bucket_raw, datetime.datetime): + continue + bucket_naive = _to_kst_naive(bucket_raw) + if drop_bucket is not None and bucket_naive == drop_bucket: + continue + + session = _session_for_bucket_start(bucket_naive) + if session is None: + continue + + venues = _normalize_venues(row.get("venues")) + rows.append( + { + "datetime": bucket_naive, + "date": bucket_naive.date(), + "time": bucket_naive.time(), + "open": _to_float(row.get("open")), + "high": _to_float(row.get("high")), + "low": _to_float(row.get("low")), + "close": _to_float(row.get("close")), + "volume": _to_float(row.get("volume")), + "value": _to_float(row.get("value")), + "session": session, + "venues": venues, + } + ) + + if current_hour_row is not None: + bucket_raw = current_hour_row.get("datetime") + if isinstance(bucket_raw, datetime.datetime): + session = _session_for_bucket_start(bucket_raw) + if session is not None: + current = dict(current_hour_row) + current["session"] = session + current["date"] = bucket_raw.date() + current["time"] = bucket_raw.time() + current["venues"] = _normalize_venues(current_hour_row.get("venues")) + rows.append(current) + + if not rows: + return pd.DataFrame( + columns=[ + "datetime", + "date", + "time", + "open", + "high", + "low", + "close", + "volume", + "value", + "session", + "venues", + ] + ) + + out = pd.DataFrame(rows) + out["datetime"] = pd.to_datetime(out["datetime"], errors="coerce") + out = out.dropna(subset=["datetime"]).sort_values("datetime").reset_index(drop=True) + out = out.tail(max(int(count), 1)).reset_index(drop=True) + return out + + +async def _build_current_hour_row( + *, + symbol: str, + now_kst: datetime.datetime, + nxt_eligible: bool, + end_date: datetime.datetime | None, +) -> tuple[dict[str, object] | None, datetime.datetime | None]: + current_bucket_start_kst = now_kst.replace(minute=0, second=0, microsecond=0) + current_bucket_naive = current_bucket_start_kst.replace(tzinfo=None) + + if _session_for_bucket_start(current_bucket_naive) is None: + return None, None + + if end_date is not None: + end_day = ( + _ensure_kst_aware(end_date).date() if end_date.tzinfo else end_date.date() + ) + if end_day != now_kst.date(): + return None, None + + start_time_kst = current_bucket_start_kst + end_time_kst = start_time_kst + datetime.timedelta(hours=1) + db_minutes = await _fetch_minute_rows( + symbol=symbol, + start_time_kst=start_time_kst, + end_time_kst=end_time_kst, + ) + + minute_by_key: dict[tuple[datetime.datetime, VenueType], _MinuteRow] = {} + for row in db_minutes: + time_raw = row.get("time") + venue_raw = row.get("venue") + if not isinstance(time_raw, datetime.datetime): + continue + venue = _to_venue(venue_raw) + minute_time = _to_kst_naive(time_raw).replace(second=0, microsecond=0) + if not ( + current_bucket_naive + <= minute_time + < current_bucket_naive + datetime.timedelta(hours=1) + ): + continue + minute_by_key[(minute_time, venue)] = _MinuteRow( + minute_time=minute_time, + venue=venue, + open=_to_float(row.get("open")), + high=_to_float(row.get("high")), + low=_to_float(row.get("low")), + close=_to_float(row.get("close")), + volume=_to_float(row.get("volume")), + value=_to_float(row.get("value")), + ) + + markets = _api_markets_for_now( + now_kst=now_kst, + nxt_eligible=nxt_eligible, + end_date=end_date, + ) + + if markets: + kis = KISClient() + api_date = now_kst.date() + + async def _fetch_one(market: str) -> pd.DataFrame: + return await kis.inquire_minute_chart( + code=symbol, + market=market, + time_unit=1, + n=30, + end_date=api_date, + ) + + frames = await asyncio.gather(*[_fetch_one(m) for m in markets]) + for market, frame in zip(markets, frames, strict=False): + if frame is None or frame.empty: + continue + venue: VenueType = "KRX" if market == "J" else "NTX" + if "datetime" not in frame.columns: + continue + dt_series = pd.to_datetime(frame["datetime"], errors="coerce") + for pos, dt_val in enumerate(dt_series.tolist()): + if pd.isna(dt_val): + continue + minute_time = ( + pd.Timestamp(dt_val) + .to_pydatetime() + .replace(second=0, microsecond=0) + ) + if not ( + current_bucket_naive + <= minute_time + < current_bucket_naive + datetime.timedelta(hours=1) + ): + continue + src = frame.iloc[pos] + minute_by_key[(minute_time, venue)] = _MinuteRow( + minute_time=minute_time, + venue=venue, + open=_to_float(src.get("open")), + high=_to_float(src.get("high")), + low=_to_float(src.get("low")), + close=_to_float(src.get("close")), + volume=_to_float(src.get("volume")), + value=_to_float(src.get("value")), + ) + + if not minute_by_key: + return None, current_bucket_naive + + minutes_by_time: dict[datetime.datetime, dict[VenueType, _MinuteRow]] = {} + venues_seen: set[str] = set() + for (minute_time, venue), row in minute_by_key.items(): + venues_seen.add(venue) + minutes_by_time.setdefault(minute_time, {})[venue] = row + + combined: list[_MinuteRow] = [] + for minute_time in sorted(minutes_by_time): + group = minutes_by_time[minute_time] + source = group.get("KRX") or group.get("NTX") + if source is None: + continue + volume = sum(r.volume for r in group.values()) + value = sum(r.value for r in group.values()) + combined.append( + _MinuteRow( + minute_time=minute_time, + venue=source.venue, + open=source.open, + high=source.high, + low=source.low, + close=source.close, + volume=volume, + value=value, + ) + ) + + if not combined: + return None, current_bucket_naive + + open_ = combined[0].open + high_ = max(m.high for m in combined) + low_ = min(m.low for m in combined) + close_ = combined[-1].close + volume_ = sum(m.volume for m in combined) + value_ = sum(m.value for m in combined) + venues = _normalize_venues( + sorted(venues_seen, key=lambda v: 0 if v == "KRX" else 1) + ) + + return ( + { + "datetime": current_bucket_naive, + "open": float(open_), + "high": float(high_), + "low": float(low_), + "close": float(close_), + "volume": float(volume_), + "value": float(value_), + "venues": venues, + }, + current_bucket_naive, + ) + + +async def read_kr_hourly_candles_1h( + *, + symbol: str, + count: int, + end_date: datetime.datetime | None, + now_kst: datetime.datetime | None = None, +) -> pd.DataFrame: + capped_count = max(int(count), 1) + resolved_now = _ensure_kst_aware(now_kst or datetime.datetime.now(_KST)) + + universe = await _resolve_universe_row(symbol) + + if end_date is None: + end_time_kst = resolved_now + else: + end_day = ( + _ensure_kst_aware(end_date).date() if end_date.tzinfo else end_date.date() + ) + end_time_kst = datetime.datetime.combine( + end_day, + datetime.time(20, 0, 0), + tzinfo=_KST, + ) + + fetch_limit = min(max(capped_count * 3, capped_count + 24), 1000) + hour_rows = await _fetch_hour_rows( + symbol=universe.symbol, + end_time_kst=end_time_kst, + limit=fetch_limit, + ) + + current_hour_row, current_bucket_start = await _build_current_hour_row( + symbol=universe.symbol, + now_kst=resolved_now, + nxt_eligible=universe.nxt_eligible, + end_date=end_date, + ) + + out = _build_hour_frame( + hour_rows=hour_rows, + current_hour_row=current_hour_row, + count=capped_count, + current_bucket_start=current_bucket_start, + ) + + if len(out) < capped_count: + raise ValueError( + f"DB does not have enough KR 1h candles for {universe.symbol}: " + f"requested={capped_count} returned={len(out)}" + ) + + return out + + +__all__ = ["read_kr_hourly_candles_1h"] diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py index 3856011d..c8e9a0c9 100644 --- a/app/tasks/__init__.py +++ b/app/tasks/__init__.py @@ -1,5 +1,6 @@ from app.tasks import ( daily_scan_tasks, + kr_candles_tasks, kr_symbol_universe_tasks, upbit_symbol_universe_tasks, us_symbol_universe_tasks, @@ -9,6 +10,7 @@ TASKIQ_TASK_MODULES = ( daily_scan_tasks, watch_scan_tasks, + kr_candles_tasks, kr_symbol_universe_tasks, upbit_symbol_universe_tasks, us_symbol_universe_tasks, diff --git a/app/tasks/kr_candles_tasks.py b/app/tasks/kr_candles_tasks.py new file mode 100644 index 00000000..277c6a5c --- /dev/null +++ b/app/tasks/kr_candles_tasks.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import logging + +from app.core.taskiq_broker import broker +from app.jobs.kr_candles import run_kr_candles_sync + +logger = logging.getLogger(__name__) + + +@broker.task( + task_name="candles.kr.sync", + schedule=[{"cron": "*/10 * * * 1-5", "cron_offset": "Asia/Seoul"}], +) +async def sync_kr_candles_incremental_task() -> dict[str, object]: + try: + return await run_kr_candles_sync(mode="incremental") + except Exception as exc: + logger.error("TaskIQ KR candles sync failed: %s", exc, exc_info=True) + return { + "status": "failed", + "mode": "incremental", + "error": str(exc), + } diff --git a/docker-compose.yml b/docker-compose.yml index 5e1082af..659cce3d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ name: auto-trader services: db: - image: postgres:17 + image: timescale/timescaledb-ha:pg17 container_name: auto_trader_pg environment: POSTGRES_USER: postgres diff --git a/docs/plans/2026-02-23-kr-candles-local-backfill-validation-design.md b/docs/plans/2026-02-23-kr-candles-local-backfill-validation-design.md new file mode 100644 index 00000000..bbf03ad1 --- /dev/null +++ b/docs/plans/2026-02-23-kr-candles-local-backfill-validation-design.md @@ -0,0 +1,68 @@ +# KR Candles Local Backfill Validation Design + +## Context +- PR #186 scope introduced KR minute-candle storage (`kr_candles_1m`) and hourly continuous aggregate (`kr_candles_1h`). +- Local runtime validation target is real KIS-backed ingestion, not mock or dry-run execution. +- Execution path should stay identical to production entrypoint behavior. + +## Goal +- Validate local `backfill` ingestion for recent 3 trading sessions using real API calls. +- Confirm data exists in `kr_candles_1m` and is materialized/available in `kr_candles_1h`. + +## Scope +- In scope: + - Run `scripts/sync_kr_candles.py` with `--mode backfill --sessions 3`. + - Verify 1m and 1h data presence using SQL queries. + - Report pass/fail strictly against agreed success criteria. +- Out of scope: + - TaskIQ scheduler/worker orchestration validation. + - Incremental mode behavior checks. + - Production deploy/migration workflow changes. + +## Execution Architecture +1. Preconditions: + - DB connection and Timescale extension available. + - `kr_candles_1m` / `kr_candles_1h` objects exist. + - `kr_symbol_universe` has active symbols for target holdings. + - Local KIS credentials are valid. +2. Ingestion: + - Run: + - `uv run python scripts/sync_kr_candles.py --mode backfill --sessions 3` +3. Validation: + - Query `kr_candles_1m` grouped by `symbol/venue` for count and time bounds. + - Query `kr_candles_1h` for recent buckets. + - Optionally inspect a representative symbol (for example `005930`) in both tables. + +## Success Criteria +1. `public.kr_candles_1m` contains backfilled rows for the executed scope. +2. `public.kr_candles_1h` returns hourly aggregated rows for symbols present in minute data. + +## Failure Criteria And Handling +- Preconditions fail (`kr_symbol_universe` empty/inactive/missing, Timescale object missing): + - Stop execution and report specific blocker. +- Script result fails (`status != completed` or non-zero exit): + - Report primary error message and stop. +- Script succeeds but table checks fail: + - Mark overall validation as failed with data-level diagnosis. + +## Validation Queries +```sql +SELECT symbol, venue, COUNT(*) AS cnt, MIN(time) AS min_time, MAX(time) AS max_time +FROM public.kr_candles_1m +GROUP BY symbol, venue +ORDER BY cnt DESC +LIMIT 20; +``` + +```sql +SELECT symbol, bucket, open, high, low, close, volume, value, venues +FROM public.kr_candles_1h +ORDER BY bucket DESC +LIMIT 50; +``` + +```sql +SELECT + (SELECT COUNT(*) FROM public.kr_candles_1m WHERE symbol = '005930') AS m1_rows, + (SELECT COUNT(*) FROM public.kr_candles_1h WHERE symbol = '005930') AS h1_rows; +``` diff --git a/docs/plans/2026-02-23-kr-candles-local-backfill-validation-implementation-plan.md b/docs/plans/2026-02-23-kr-candles-local-backfill-validation-implementation-plan.md new file mode 100644 index 00000000..a48e6dd7 --- /dev/null +++ b/docs/plans/2026-02-23-kr-candles-local-backfill-validation-implementation-plan.md @@ -0,0 +1,177 @@ +# KR Candles Local Backfill Validation Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** 로컬 환경에서 실제 KIS 호출로 KR 3세션 backfill을 수행하고 `kr_candles_1m` 적재 및 `kr_candles_1h` 시간봉 생성을 검증한다. + +**Architecture:** 신규 코드 추가 없이 기존 운영 엔트리포인트(`scripts/sync_kr_candles.py`)를 그대로 실행한다. 실행 전 선행조건(DB/Timescale/심볼 유니버스/자격증명)을 확인하고, 실행 후 SQL 검증으로 성공 기준(1m 적재 + 1h 조회 가능)을 판정한다. 실패 시 @systematic-debugging 흐름으로 즉시 원인 분류 후 재시도한다. + +**Tech Stack:** Python 3.13+, uv, PostgreSQL/TimescaleDB, SQLAlchemy AsyncSession, KIS Open API + +--- + +### Task 1: Preflight Guard Checks + +**Files:** +- Reference: `scripts/sync_kr_candles.py` +- Reference: `app/services/kr_candles_sync_service.py` +- Reference: `scripts/sql/kr_candles_timescale.sql` + +**Step 1: Write the failing preflight check command** + +```bash +uv run python - <<'PY' +import asyncio +from sqlalchemy import text +from app.core.config import settings +from app.core.db import AsyncSessionLocal + +required = ["kis_app_key", "kis_app_secret", "DATABASE_URL"] +missing = [name for name in required if not getattr(settings, name, None)] +if missing: + raise SystemExit(f"Missing required settings: {missing}") + +async def main(): + async with AsyncSessionLocal() as session: + for obj in ("public.kr_candles_1m", "public.kr_candles_1h", "public.kr_symbol_universe"): + val = (await session.execute(text("SELECT to_regclass(:n)"), {"n": obj})).scalar_one() + if not val: + raise SystemExit(f"Missing DB object: {obj}") + universe_cnt = (await session.execute(text("SELECT COUNT(*) FROM public.kr_symbol_universe WHERE is_active = true"))).scalar_one() + if int(universe_cnt) == 0: + raise SystemExit("kr_symbol_universe has no active rows") + print("preflight_ok") + +asyncio.run(main()) +PY +``` + +**Step 2: Run check to verify preconditions** + +Run: 위 명령 실행 +Expected: `preflight_ok` 출력. +If fail: blocker를 기록하고 Task 1 Step 3로 이동. + +**Step 3: Resolve blockers minimally** + +Run exactly what is needed: +- Migration required: `uv run alembic upgrade head` +- Universe missing/stale: `make sync-kr-symbol-universe` + +**Step 4: Re-run preflight** + +Run: Step 1 명령 재실행 +Expected: PASS (`preflight_ok`) + +**Step 5: Commit** + +```bash +# 코드 변경이 없으면 commit 생략 (no-op) +git status --short +``` + +--- + +### Task 2: Execute Real Backfill (3 Sessions) + +**Files:** +- Run: `scripts/sync_kr_candles.py` +- Reference: `app/jobs/kr_candles.py` + +**Step 1: Write a failing runtime assertion target** + +성공 조건: 스크립트 exit code `0` and payload/status `completed`. + +**Step 2: Run backfill command** + +Run: + +```bash +uv run python scripts/sync_kr_candles.py --mode backfill --sessions 3 +``` + +Expected: 종료 코드 `0`, 로그에 `KR candles sync completed` 포함. + +**Step 3: If failure, debug with bounded evidence (@systematic-debugging)** + +Run: + +```bash +uv run python scripts/sync_kr_candles.py --mode backfill --sessions 1 +``` + +Expected: 최소 범위에서 동일 실패 재현 여부 확인. +실패 유형을 `precondition`, `KIS API`, `DB upsert`로 분류 후 원인 하나씩 해소. + +**Step 4: Re-run target scope** + +Run: + +```bash +uv run python scripts/sync_kr_candles.py --mode backfill --sessions 3 +``` + +Expected: PASS (`completed`) + +**Step 5: Commit** + +```bash +# 코드 변경이 없으면 commit 생략 (no-op) +git status --short +``` + +--- + +### Task 3: Validate Success Criteria (1m Loaded + 1h Available) + +**Files:** +- Reference: `scripts/sql/kr_candles_timescale.sql` + +**Step 1: Write failing SQL assertions** + +```sql +SELECT symbol, venue, COUNT(*) AS cnt, MIN(time) AS min_time, MAX(time) AS max_time +FROM public.kr_candles_1m +GROUP BY symbol, venue +ORDER BY cnt DESC +LIMIT 20; +``` + +```sql +SELECT symbol, bucket, open, high, low, close, volume, value, venues +FROM public.kr_candles_1h +ORDER BY bucket DESC +LIMIT 50; +``` + +**Step 2: Run validation queries** + +Run via local SQL client (`psql`, DBeaver, TablePlus) against same `DATABASE_URL`. +Expected: +- 첫 쿼리에서 유의미한 `cnt > 0` 결과 존재 +- 둘째 쿼리에서 최근 `bucket` 행 존재 + +**Step 3: Optional symbol-level assertion** + +```sql +SELECT + (SELECT COUNT(*) FROM public.kr_candles_1m WHERE symbol = '005930') AS m1_rows, + (SELECT COUNT(*) FROM public.kr_candles_1h WHERE symbol = '005930') AS h1_rows; +``` + +Expected: `m1_rows > 0` and `h1_rows > 0` + +**Step 4: Final verification gate (@verification-before-completion)** + +성공 판정: +1. `kr_candles_1m` 적재 확인 +2. `kr_candles_1h` 시간봉 조회 확인 + +실패 시 Task 2/Task 1로 되돌아가 원인 해소 후 재검증. + +**Step 5: Commit** + +```bash +# 코드 변경이 없으면 commit 생략 (no-op) +git status --short +``` diff --git a/docs/plans/2026-02-23-kr-ohlcv-1h-v2.md b/docs/plans/2026-02-23-kr-ohlcv-1h-v2.md new file mode 100644 index 00000000..02d58a00 --- /dev/null +++ b/docs/plans/2026-02-23-kr-ohlcv-1h-v2.md @@ -0,0 +1,86 @@ +# KR get_ohlcv(period=1h) v2 Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Make KR `get_ohlcv(period="1h")` read hourly candles from Timescale (`public.kr_candles_1h`) and rebuild the in-progress hour from minute DB + KIS minute API without double-counting, and expose `session` and `venues` fields only for KR 1h rows. + +**Architecture:** Add a dedicated read service (`app/services/kr_hourly_candles_read_service.py`) that performs DB-first reads and optionally rebuilds the current hour by merging minute rows from `public.kr_candles_1m` and KIS minute API (`n=30`). MCP `get_ohlcv` delegates KR 1h to this service; tests + docs reflect the new contract. + +**Tech Stack:** FastAPI + MCP tools, async SQLAlchemy engine, pandas, pytest. + +--- + +### Task 1: Fix typing/LSP issues in MCP integration tests + +**Files:** +- Modify: `tests/test_mcp_server_tools.py` + +**Step 1: Run LSP diagnostics and list error locations** + +Run: `uv run python -c "print('use IDE lsp diagnostics')"` (in practice: run the repo LSP tool) +Expected: a small set of type errors (indicators list typing, Optional Request, MockFastInfo attribute typing, register_all_tools typing). + +**Step 2: Add minimal type-safe casts or local TypedDicts (no `as any`, no ignores)** + +- Ensure any `indicators=[...]` passed into tools uses the declared indicator type (use the same constants/types as production). +- Replace `None` request values by constructing a real request object or adjust helper to accept Optional. +- Replace ad-hoc attribute assignment on `MockFastInfo` with a simple dataclass/attrs object or `types.SimpleNamespace` with explicit attributes. +- Fix `register_all_tools(mcp)` typing by aligning `DummyMCP` with the expected protocol or casting to `Any` at the call boundary. + +**Step 3: Re-run LSP diagnostics** + +Expected: 0 errors for `tests/test_mcp_server_tools.py`. + +### Task 2: Complete MCP KR 1h tests for new schema + +**Files:** +- Modify: `tests/test_mcp_server_tools.py` + +**Step 1: Add/adjust KR 1h MCP test** + +- Mock `app.mcp_server.tooling.market_data_quotes.read_kr_hourly_candles_1h` to return a DataFrame with the full expected columns including `session` and `venues`. +- Assert KR 1h MCP response rows include those keys. + +**Step 2: Assert other market/period schemas unchanged** + +- Add a small test that calls another period (e.g., KR 1m or US 1h) and asserts `session`/`venues` are not present (or remain in their prior format). + +**Step 3: Ensure KR 1h does not use Redis cache** + +- Mock `kis_ohlcv_cache` and assert it is not invoked for KR 1h code path. + +**Step 4: Run focused pytest** + +Run: `uv run pytest --no-cov tests/test_mcp_server_tools.py -k "get_ohlcv and kr and 1h" -q` +Expected: PASS. + +### Task 3: Update MCP README for KR 1h v2 behavior + +**Files:** +- Modify: `app/mcp_server/README.md` + +**Step 1: Document KR 1h data sources** + +- History reads: `public.kr_candles_1h`. +- Current hour: rebuild from `kr_candles_1m` + KIS minute API (n=30) with dedup/merge. + +**Step 2: Document time windows and failure behavior** + +- `09:00 <= now < 15:35`: call KIS J + NX in parallel (if eligible); if either fails => error. +- Past `end_date`: DB-only. +- No Redis cache for KR 1h. + +**Step 3: Document output fields** + +- KR 1h rows only include `session` and `venues`. + +### Task 4: Full verification + +**Step 1: LSP diagnostics on changed files** + +Expected: 0 errors. + +**Step 2: Run lint + tests** + +Run: `make lint` then `make test` +Expected: exit code 0. diff --git a/scripts/sql/kr_candles_timescale.sql b/scripts/sql/kr_candles_timescale.sql new file mode 100644 index 00000000..dc52c4de --- /dev/null +++ b/scripts/sql/kr_candles_timescale.sql @@ -0,0 +1,164 @@ +DO $$ +DECLARE + v_extversion TEXT; + v_version_core TEXT; + v_parts TEXT[]; + v_major INTEGER; + v_minor INTEGER; + v_patch INTEGER; +BEGIN + SELECT extversion + INTO v_extversion + FROM pg_extension + WHERE extname = 'timescaledb'; + + IF v_extversion IS NULL THEN + RAISE EXCEPTION 'timescaledb extension is not installed'; + END IF; + + v_version_core := split_part(v_extversion, '-', 1); + v_parts := regexp_split_to_array(v_version_core, '\.'); + + v_major := COALESCE(v_parts[1], '0')::INTEGER; + v_minor := COALESCE(v_parts[2], '0')::INTEGER; + v_patch := COALESCE(v_parts[3], '0')::INTEGER; + + IF (v_major, v_minor, v_patch) < (2, 8, 1) THEN + RAISE EXCEPTION + 'timescaledb extension version % is below required minimum 2.8.1', + v_extversion; + END IF; +END +$$; + +CREATE TABLE public.kr_candles_1m ( + time TIMESTAMPTZ NOT NULL, + symbol TEXT NOT NULL, + venue TEXT NOT NULL, + open NUMERIC NOT NULL, + high NUMERIC NOT NULL, + low NUMERIC NOT NULL, + close NUMERIC NOT NULL, + volume NUMERIC NOT NULL, + value NUMERIC NOT NULL, + CONSTRAINT ck_kr_candles_1m_venue CHECK (venue IN ('KRX', 'NTX')), + CONSTRAINT uq_kr_candles_1m_time_symbol_venue UNIQUE (time, symbol, venue) +); + +SELECT create_hypertable( + 'public.kr_candles_1m', + 'time', + migrate_data => TRUE +); + +CREATE INDEX ix_kr_candles_1m_symbol_time_desc + ON public.kr_candles_1m (symbol, time DESC); + +CREATE MATERIALIZED VIEW public.kr_candles_1h +WITH ( + timescaledb.continuous, + timescaledb.materialized_only = false +) +AS +SELECT + time_bucket(INTERVAL '1 hour', time, 'Asia/Seoul') AS bucket, + symbol, + FIRST( + open, + ((extract(epoch from time) * 1000000)::bigint * 2 + + CASE WHEN venue = 'KRX' THEN 0 ELSE 1 END) + ) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST( + close, + ((extract(epoch from time) * 1000000)::bigint * 2 + + CASE WHEN venue = 'KRX' THEN 1 ELSE 0 END) + ) AS close, + SUM(volume) AS volume, + SUM(value) AS value, + array_agg(DISTINCT venue ORDER BY venue) AS venues +FROM public.kr_candles_1m +GROUP BY bucket, symbol +WITH NO DATA; + +DO $$ +BEGIN + IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN + EXECUTE $sql$ + SELECT remove_continuous_aggregate_policy( + 'public.kr_candles_1h', + if_exists => TRUE + ) + $sql$; + + EXECUTE $sql$ + SELECT add_continuous_aggregate_policy( + 'public.kr_candles_1h', + start_offset => INTERVAL '2 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '5 minutes' + ) + $sql$; + END IF; +END +$$; + +DO $$ +BEGIN + IF to_regclass('public.kr_candles_1m') IS NOT NULL THEN + PERFORM remove_retention_policy( + 'public.kr_candles_1m', + if_exists => TRUE + ); + PERFORM add_retention_policy( + 'public.kr_candles_1m', + INTERVAL '90 days' + ); + END IF; + + IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN + PERFORM remove_retention_policy( + 'public.kr_candles_1h', + if_exists => TRUE + ); + PERFORM add_retention_policy( + 'public.kr_candles_1h', + INTERVAL '90 days' + ); + END IF; +END +$$; + +DO $$ +DECLARE + v_start TIMESTAMPTZ; + v_end TIMESTAMPTZ; + v_refresh_end TIMESTAMPTZ; +BEGIN + IF to_regclass('public.kr_candles_1h') IS NULL THEN + RETURN; + END IF; + + SELECT MIN(time), MAX(time) + INTO v_start, v_end + FROM public.kr_candles_1m; + + IF v_start IS NOT NULL AND v_end IS NOT NULL THEN + v_refresh_end := LEAST( + v_end + INTERVAL '1 hour', + date_trunc('hour', now() - INTERVAL '1 hour') + ); + + IF v_refresh_end <= v_start THEN + RETURN; + END IF; + + CALL refresh_continuous_aggregate( + 'public.kr_candles_1h', + v_start, + v_refresh_end + ); + END IF; +END +$$; diff --git a/scripts/sync_kr_candles.py b/scripts/sync_kr_candles.py new file mode 100644 index 00000000..0fc78684 --- /dev/null +++ b/scripts/sync_kr_candles.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import asyncio +import logging + +from app.core.config import settings +from app.jobs.kr_candles import run_kr_candles_sync +from app.monitoring.sentry import capture_exception, init_sentry + +logger = logging.getLogger(__name__) + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Sync KR candles (1m/1h pipeline source)" + ) + parser.add_argument( + "--mode", + choices=["incremental", "backfill"], + default="incremental", + help="Sync mode (default: incremental)", + ) + parser.add_argument( + "--sessions", + type=int, + default=10, + help="Backfill trading sessions (default: 10)", + ) + parser.add_argument( + "--user-id", + type=int, + default=1, + help="Manual holdings user id (default: 1)", + ) + return parser + + +async def main(argv: list[str] | None = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) + + logging.basicConfig( + level=getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + init_sentry(service_name="kr-candles-sync") + + try: + result = await run_kr_candles_sync( + mode=args.mode, + sessions=max(args.sessions, 1), + user_id=args.user_id, + ) + except Exception as exc: + capture_exception(exc, process="sync_kr_candles") + logger.error("KR candles sync crashed: %s", exc, exc_info=True) + return 1 + + if result.get("status") != "completed": + logger.error("KR candles sync failed: %s", result) + return 1 + + logger.info("KR candles sync completed: %s", result) + return 0 + + +if __name__ == "__main__": + raise SystemExit(asyncio.run(main())) diff --git a/tests/test_kr_candles_sync.py b/tests/test_kr_candles_sync.py new file mode 100644 index 00000000..5a01f2b0 --- /dev/null +++ b/tests/test_kr_candles_sync.py @@ -0,0 +1,276 @@ +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + + +def _make_universe_row( + symbol: str, + *, + nxt_eligible: bool, + is_active: bool, +): + from app.models.kr_symbol_universe import KRSymbolUniverse + + return KRSymbolUniverse( + symbol=symbol, + name=f"NAME-{symbol}", + exchange="KOSPI", + nxt_eligible=nxt_eligible, + is_active=is_active, + ) + + +def test_build_symbol_union_combines_kis_and_manual_symbols() -> None: + from app.services import kr_candles_sync_service as svc + + kis_holdings = [ + {"pdno": "5930"}, + {"pdno": "035420"}, + {"pdno": None}, + ] + manual_holdings = [ + SimpleNamespace(ticker="005930"), + SimpleNamespace(ticker="000660"), + ] + + symbols = svc._build_symbol_union(kis_holdings, manual_holdings) + + assert symbols == {"005930", "035420", "000660"} + + +def test_validate_universe_rows_fails_when_table_empty() -> None: + from app.services import kr_candles_sync_service as svc + + with pytest.raises(ValueError, match="kr_symbol_universe is empty"): + svc._validate_universe_rows( + target_symbols={"005930"}, + universe_rows=[], + table_has_rows=False, + ) + + +def test_validate_universe_rows_fails_when_symbol_missing() -> None: + from app.services import kr_candles_sync_service as svc + + row = _make_universe_row("005930", nxt_eligible=True, is_active=True) + + with pytest.raises(ValueError, match="not registered"): + svc._validate_universe_rows( + target_symbols={"005930", "000660"}, + universe_rows=[row], + table_has_rows=True, + ) + + +def test_validate_universe_rows_fails_when_symbol_inactive() -> None: + from app.services import kr_candles_sync_service as svc + + inactive_row = _make_universe_row("005930", nxt_eligible=False, is_active=False) + + with pytest.raises(ValueError, match="inactive"): + svc._validate_universe_rows( + target_symbols={"005930"}, + universe_rows=[inactive_row], + table_has_rows=True, + ) + + +def test_build_venue_plan_maps_dual_and_single_venues() -> None: + from app.services import kr_candles_sync_service as svc + + rows_by_symbol = { + "005930": _make_universe_row("005930", nxt_eligible=True, is_active=True), + "000660": _make_universe_row("000660", nxt_eligible=False, is_active=True), + } + + plan = svc._build_venue_plan(rows_by_symbol) + + assert [v.venue for v in plan["005930"]] == ["KRX", "NTX"] + assert [v.market_code for v in plan["005930"]] == ["J", "NX"] + assert [v.venue for v in plan["000660"]] == ["KRX"] + + +def test_should_process_venue_skips_holiday_in_incremental_mode() -> None: + from app.services import kr_candles_sync_service as svc + + venue = svc._VENUE_CONFIG["KRX"] + now_kst = datetime(2026, 1, 1, 10, 0, tzinfo=svc._KST) + + should_process, reason = svc._should_process_venue( + mode="incremental", + now_kst=now_kst, + is_session_day=False, + venue=venue, + ) + + assert should_process is False + assert reason == "holiday" + + +def test_should_process_venue_skips_outside_session_in_incremental_mode() -> None: + from app.services import kr_candles_sync_service as svc + + venue = svc._VENUE_CONFIG["KRX"] + now_kst = datetime(2026, 2, 23, 8, 10, tzinfo=svc._KST) + + should_process, reason = svc._should_process_venue( + mode="incremental", + now_kst=now_kst, + is_session_day=True, + venue=venue, + ) + + assert should_process is False + assert reason == "outside_session" + + +def test_compute_incremental_cutoff_uses_five_minute_overlap() -> None: + from app.services import kr_candles_sync_service as svc + + cursor_utc = datetime(2026, 2, 23, 1, 30, tzinfo=UTC) + + cutoff_kst = svc._compute_incremental_cutoff_kst(cursor_utc) + + assert cutoff_kst is not None + assert cutoff_kst.tzinfo == svc._KST + assert cutoff_kst.strftime("%Y-%m-%d %H:%M:%S") == "2026-02-23 10:25:00" + + +def test_convert_kis_datetime_to_utc_interprets_naive_as_kst() -> None: + from app.services import kr_candles_sync_service as svc + + converted = svc._convert_kis_datetime_to_utc(datetime(2026, 2, 23, 9, 0, 0)) + + assert converted.tzinfo == UTC + assert converted.strftime("%Y-%m-%d %H:%M:%S") == "2026-02-23 00:00:00" + + +@pytest.mark.asyncio +async def test_run_kr_candles_sync_success_payload( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from app.jobs import kr_candles + + monkeypatch.setattr( + kr_candles, + "sync_kr_candles", + AsyncMock(return_value={"mode": "incremental", "rows_upserted": 11}), + ) + + result = await kr_candles.run_kr_candles_sync(mode="incremental") + + assert result["status"] == "completed" + assert result["mode"] == "incremental" + assert result["rows_upserted"] == 11 + + +@pytest.mark.asyncio +async def test_run_kr_candles_sync_failure_payload( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from app.jobs import kr_candles + + monkeypatch.setattr( + kr_candles, + "sync_kr_candles", + AsyncMock(side_effect=RuntimeError("sync failure")), + ) + + result = await kr_candles.run_kr_candles_sync(mode="incremental") + + assert result["status"] == "failed" + assert "sync failure" in str(result["error"]) + + +@pytest.mark.asyncio +async def test_task_payload_success(monkeypatch: pytest.MonkeyPatch) -> None: + from app.tasks import kr_candles_tasks + + monkeypatch.setattr( + kr_candles_tasks, + "run_kr_candles_sync", + AsyncMock(return_value={"status": "completed", "rows_upserted": 3}), + ) + + result = await kr_candles_tasks.sync_kr_candles_incremental_task() + + assert result["status"] == "completed" + assert result["rows_upserted"] == 3 + + +@pytest.mark.asyncio +async def test_task_payload_failure(monkeypatch: pytest.MonkeyPatch) -> None: + from app.tasks import kr_candles_tasks + + monkeypatch.setattr( + kr_candles_tasks, + "run_kr_candles_sync", + AsyncMock(side_effect=RuntimeError("task crash")), + ) + + result = await kr_candles_tasks.sync_kr_candles_incremental_task() + + assert result["status"] == "failed" + assert "task crash" in str(result["error"]) + + +@pytest.mark.asyncio +async def test_script_main_exit_codes(monkeypatch: pytest.MonkeyPatch) -> None: + from scripts import sync_kr_candles + + monkeypatch.setattr(sync_kr_candles, "init_sentry", lambda **_: None) + monkeypatch.setattr( + sync_kr_candles, + "run_kr_candles_sync", + AsyncMock(return_value={"status": "completed", "rows_upserted": 1}), + ) + success_code = await sync_kr_candles.main(["--mode", "incremental"]) + assert success_code == 0 + + monkeypatch.setattr( + sync_kr_candles, + "run_kr_candles_sync", + AsyncMock(return_value={"status": "failed", "error": "boom"}), + ) + failed_status_code = await sync_kr_candles.main(["--mode", "incremental"]) + assert failed_status_code == 1 + + capture_mock = MagicMock() + monkeypatch.setattr(sync_kr_candles, "capture_exception", capture_mock) + monkeypatch.setattr( + sync_kr_candles, + "run_kr_candles_sync", + AsyncMock(side_effect=RuntimeError("hard crash")), + ) + exception_code = await sync_kr_candles.main(["--mode", "incremental"]) + assert exception_code == 1 + capture_mock.assert_called_once() + + +def test_new_retention_migration_contains_upgrade_and_downgrade_policy_sql() -> None: + versions_dir = Path("alembic/versions") + matches = sorted(versions_dir.glob("*_add_kr_candles_retention_policy.py")) + assert matches, "retention migration file is missing" + + content = matches[-1].read_text(encoding="utf-8") + + assert "add_retention_policy" in content + assert "remove_retention_policy" in content + assert "kr_candles_1m" in content + assert "kr_candles_1h" in content + assert "90 days" in content + + +def test_sql_script_contains_90_day_retention_policy_for_both_tables() -> None: + content = Path("scripts/sql/kr_candles_timescale.sql").read_text(encoding="utf-8") + + assert "add_retention_policy" in content + assert "remove_retention_policy" in content + assert "public.kr_candles_1m" in content + assert "public.kr_candles_1h" in content + assert "90 days" in content diff --git a/tests/test_kr_hourly_candles_read_service.py b/tests/test_kr_hourly_candles_read_service.py new file mode 100644 index 00000000..bb4da3f0 --- /dev/null +++ b/tests/test_kr_hourly_candles_read_service.py @@ -0,0 +1,875 @@ +from __future__ import annotations + +import datetime +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pandas as pd +import pytest + + +class _ScalarResult: + def __init__(self, value): + self._value = value + + def scalar_one_or_none(self): + return self._value + + +class _MappingsResult: + def __init__(self, rows: list[dict[str, object]]): + self._rows = list(rows) + + def mappings(self): + return self + + def all(self): + return list(self._rows) + + +class DummySessionManager: + def __init__(self, session): + self._session = session + + async def __aenter__(self): + return self._session + + async def __aexit__(self, exc_type, exc, tb): + return None + + +def _dt_kst(y: int, m: int, d: int, hh: int, mm: int, ss: int = 0) -> datetime.datetime: + return datetime.datetime( + y, m, d, hh, mm, ss, tzinfo=datetime.timezone(datetime.timedelta(hours=9)) + ) + + +def _make_hour_row( + *, + bucket_kst_naive: datetime.datetime, + open: float, + high: float, + low: float, + close: float, + volume: float, + value: float, + venues: list[str], +) -> dict[str, object]: + return { + "bucket": bucket_kst_naive.replace( + tzinfo=datetime.timezone(datetime.timedelta(hours=9)) + ), + "open": open, + "high": high, + "low": low, + "close": close, + "volume": volume, + "value": value, + "venues": venues, + } + + +def _make_minute_row( + *, + time_kst: datetime.datetime, + venue: str, + open: float, + high: float, + low: float, + close: float, + volume: float, + value: float, +) -> dict[str, object]: + return { + "time": time_kst.astimezone(datetime.UTC), + "venue": venue, + "open": open, + "high": high, + "low": low, + "close": close, + "volume": volume, + "value": value, + } + + +@pytest.mark.asyncio +async def test_api_prefetch_plan_time_boundaries_for_nxt_eligible(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + + minute_rows = [ + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 8, 0, 0), + venue="NTX", + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + ), + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 9, 0, 0), + venue="KRX", + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + ), + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 15, 0, 0), + venue="KRX", + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + ), + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 20, 0, 0), + venue="NTX", + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + ), + ] + + class DummyDB: + def __init__(self): + self.calls: list[str] = [] + + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + self.calls.append(sql) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": True, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult([]) + if "FROM public.kr_candles_1m" in sql: + target_rows = list(minute_rows) + if isinstance(params, dict) and params.get("start_time") is not None: + start = params["start_time"] + end = params["end_time"] + target_rows = [ + row for row in target_rows if start <= row["time"] < end + ] + return _MappingsResult(target_rows) + raise AssertionError(f"unexpected sql: {sql}") + + db = DummyDB() + monkeypatch.setattr(svc, "AsyncSessionLocal", lambda: DummySessionManager(db)) + + class DummyKIS: + def __init__(self): + self.calls: list[str] = [] + + async def inquire_minute_chart( + self, *, code, market, time_unit, n, end_date=None + ): + del code, time_unit, n, end_date + self.calls.append(str(market)) + return pd.DataFrame( + columns=[ + "datetime", + "date", + "time", + "open", + "high", + "low", + "close", + "volume", + "value", + ] + ) + + kis = DummyKIS() + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 8, 0, 0), + ) + assert kis.calls == ["NX"] + kis.calls.clear() + + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 9, 0, 0), + ) + assert sorted(kis.calls) == ["J", "NX"] + kis.calls.clear() + + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 15, 34, 0), + ) + assert sorted(kis.calls) == ["J", "NX"] + kis.calls.clear() + + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 15, 35, 0), + ) + assert kis.calls == ["NX"] + kis.calls.clear() + + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 20, 0, 0), + ) + assert kis.calls == [] + + +@pytest.mark.asyncio +async def test_api_prefetch_plan_respects_nxt_ineligible(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + + minute_rows = [ + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 8, 0, 0), + venue="KRX", + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + ), + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 10, 0, 0), + venue="KRX", + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + ), + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 16, 0, 0), + venue="KRX", + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + ), + ] + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": False, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult([]) + if "FROM public.kr_candles_1m" in sql: + target_rows = list(minute_rows) + if isinstance(params, dict) and params.get("start_time") is not None: + start = params["start_time"] + end = params["end_time"] + target_rows = [ + row for row in target_rows if start <= row["time"] < end + ] + return _MappingsResult(target_rows) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + + kis = SimpleNamespace(inquire_minute_chart=AsyncMock()) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + # 08:00-09:00 NX-only time window but nxt_eligible=false -> API 0 + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 8, 10, 0), + ) + kis.inquire_minute_chart.assert_not_awaited() + + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 10, 0, 0), + ) + kis.inquire_minute_chart.assert_awaited_once() + assert kis.inquire_minute_chart.await_args.kwargs["market"] == "J" + + kis.inquire_minute_chart.reset_mock() + + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 16, 0, 0), + ) + kis.inquire_minute_chart.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_end_date_in_past_disables_api(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + + hour_rows = [ + _make_hour_row( + bucket_kst_naive=datetime.datetime(2026, 2, 20, 10, 0, 0), + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + venues=["KRX"], + ) + ] + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": True, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult(hour_rows) + if "FROM public.kr_candles_1m" in sql: + return _MappingsResult([]) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + + kis = SimpleNamespace(inquire_minute_chart=AsyncMock()) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=_dt_kst(2026, 2, 20, 0, 0, 0), + now_kst=_dt_kst(2026, 2, 23, 10, 0, 0), + ) + + kis.inquire_minute_chart.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_current_hour_is_reaggregated_from_minutes_not_from_db_hour(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + now_kst = _dt_kst(2026, 2, 23, 10, 10, 0) + current_bucket = datetime.datetime(2026, 2, 23, 10, 0, 0) + + hour_rows = [ + _make_hour_row( + bucket_kst_naive=current_bucket, + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + venues=["KRX"], + ) + ] + + minute_rows = [ + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 10, 0, 0), + venue="KRX", + open=100.0, + high=101.0, + low=99.0, + close=100.5, + volume=10.0, + value=1000.0, + ), + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 10, 1, 0), + venue="KRX", + open=100.5, + high=102.0, + low=100.0, + close=101.0, + volume=20.0, + value=2000.0, + ), + ] + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": False, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult(hour_rows) + if "FROM public.kr_candles_1m" in sql: + return _MappingsResult(minute_rows) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + + kis = SimpleNamespace(inquire_minute_chart=AsyncMock(return_value=pd.DataFrame())) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + out = await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=now_kst, + ) + + assert len(out) == 1 + row = out.iloc[0] + assert row["datetime"] == current_bucket + assert row["open"] == 100.0 + assert row["high"] == 102.0 + assert row["low"] == 99.0 + assert row["close"] == 101.0 + assert row["volume"] == 30.0 + assert row["value"] == 3000.0 + + +@pytest.mark.asyncio +async def test_api_overrides_db_minutes_for_same_minute_and_venue(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + now_kst = _dt_kst(2026, 2, 23, 10, 10, 0) + current_bucket = datetime.datetime(2026, 2, 23, 10, 0, 0) + + hour_rows: list[dict[str, object]] = [] + minute_rows = [ + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 10, 0, 0), + venue="KRX", + open=100.0, + high=100.0, + low=100.0, + close=100.0, + volume=10.0, + value=1000.0, + ) + ] + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": False, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult(hour_rows) + if "FROM public.kr_candles_1m" in sql: + return _MappingsResult(minute_rows) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + + api_df = pd.DataFrame( + [ + { + "datetime": pd.Timestamp("2026-02-23 10:00:00"), + "date": datetime.date(2026, 2, 23), + "time": datetime.time(10, 0, 0), + "open": 200.0, + "high": 200.0, + "low": 200.0, + "close": 200.0, + "volume": 1, + "value": 100, + } + ] + ) + + kis = SimpleNamespace(inquire_minute_chart=AsyncMock(return_value=api_df)) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + out = await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=now_kst, + ) + + assert len(out) == 1 + row = out.iloc[0] + assert row["datetime"] == current_bucket + assert row["open"] == 200.0 + assert row["close"] == 200.0 + assert row["volume"] == 1.0 + + +@pytest.mark.asyncio +async def test_same_minute_both_venues_price_krx_priority_volume_sum(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + now_kst = _dt_kst(2026, 2, 23, 9, 10, 0) + current_bucket = datetime.datetime(2026, 2, 23, 9, 0, 0) + + hour_rows: list[dict[str, object]] = [] + minute_rows = [ + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 9, 0, 0), + venue="KRX", + open=100.0, + high=110.0, + low=90.0, + close=105.0, + volume=10.0, + value=1000.0, + ), + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 9, 0, 0), + venue="NTX", + open=200.0, + high=210.0, + low=190.0, + close=205.0, + volume=5.0, + value=500.0, + ), + ] + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": True, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult(hour_rows) + if "FROM public.kr_candles_1m" in sql: + return _MappingsResult(minute_rows) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + kis = SimpleNamespace(inquire_minute_chart=AsyncMock(return_value=pd.DataFrame())) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + out = await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=now_kst, + ) + assert len(out) == 1 + row = out.iloc[0] + assert row["datetime"] == current_bucket + assert row["open"] == 100.0 + assert row["close"] == 105.0 + assert row["volume"] == 15.0 + assert row["value"] == 1500.0 + assert row["venues"] == ["KRX", "NTX"] + + +@pytest.mark.asyncio +async def test_synthetic_current_hour_created_when_db_hour_missing(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + now_kst = _dt_kst(2026, 2, 23, 10, 10, 0) + current_bucket = datetime.datetime(2026, 2, 23, 10, 0, 0) + + hour_rows: list[dict[str, object]] = [] + minute_rows = [ + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 10, 0, 0), + venue="KRX", + open=100.0, + high=101.0, + low=99.0, + close=100.5, + volume=10.0, + value=1000.0, + ) + ] + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": False, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult(hour_rows) + if "FROM public.kr_candles_1m" in sql: + return _MappingsResult(minute_rows) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + kis = SimpleNamespace(inquire_minute_chart=AsyncMock(return_value=pd.DataFrame())) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + out = await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=now_kst, + ) + assert len(out) == 1 + assert out.iloc[0]["datetime"] == current_bucket + + +@pytest.mark.asyncio +async def test_session_and_venues_fields_present_and_labeled(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + now_kst = _dt_kst(2026, 2, 23, 8, 10, 0) + + hour_rows = [ + _make_hour_row( + bucket_kst_naive=datetime.datetime(2026, 2, 23, 8, 0, 0), + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + venues=["NTX"], + ) + ] + + minute_rows = [ + _make_minute_row( + time_kst=_dt_kst(2026, 2, 23, 8, 0, 0), + venue="NTX", + open=1.0, + high=1.0, + low=1.0, + close=1.0, + volume=1.0, + value=1.0, + ) + ] + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": True, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult(hour_rows) + if "FROM public.kr_candles_1m" in sql: + return _MappingsResult(minute_rows) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + kis = SimpleNamespace(inquire_minute_chart=AsyncMock(return_value=pd.DataFrame())) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + out = await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=now_kst, + ) + + row = out.iloc[0] + assert row["session"] == "PRE_MARKET" + assert row["venues"] == ["NTX"] + + +@pytest.mark.asyncio +async def test_db_insufficient_rows_raises(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": False, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult([]) + if "FROM public.kr_candles_1m" in sql: + return _MappingsResult([]) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + kis = SimpleNamespace(inquire_minute_chart=AsyncMock(return_value=pd.DataFrame())) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + with pytest.raises(ValueError, match="DB does not have enough KR 1h candles"): + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=2, + end_date=None, + now_kst=_dt_kst(2026, 2, 23, 10, 0, 0), + ) + + +@pytest.mark.asyncio +async def test_api_partial_failure_raises(monkeypatch): + from app.services import kr_hourly_candles_read_service as svc + + symbol = "005930" + now_kst = _dt_kst(2026, 2, 23, 9, 0, 0) + + class DummyDB: + async def execute(self, query, params=None): + sql = str(getattr(query, "text", query)) + if "FROM public.kr_symbol_universe" in sql and "LIMIT 1" in sql: + return _ScalarResult(symbol) + if "FROM public.kr_symbol_universe" in sql and "WHERE symbol" in sql: + return _MappingsResult( + [ + { + "symbol": symbol, + "nxt_eligible": True, + "is_active": True, + } + ] + ) + if "FROM public.kr_candles_1h" in sql: + return _MappingsResult([]) + if "FROM public.kr_candles_1m" in sql: + return _MappingsResult([]) + raise AssertionError(f"unexpected sql: {sql}") + + monkeypatch.setattr( + svc, "AsyncSessionLocal", lambda: DummySessionManager(DummyDB()) + ) + + async def _fail_on_nx(*, market, **_): + if market == "NX": + raise RuntimeError("NX failed") + return pd.DataFrame() + + kis = SimpleNamespace(inquire_minute_chart=AsyncMock(side_effect=_fail_on_nx)) + monkeypatch.setattr(svc, "KISClient", lambda: kis) + + with pytest.raises(RuntimeError, match="NX failed"): + await svc.read_kr_hourly_candles_1h( + symbol=symbol, + count=1, + end_date=None, + now_kst=now_kst, + ) diff --git a/tests/test_mcp_server_tools.py b/tests/test_mcp_server_tools.py index 087575e9..e2d6a2b4 100644 --- a/tests/test_mcp_server_tools.py +++ b/tests/test_mcp_server_tools.py @@ -1,7 +1,10 @@ +import dataclasses +import datetime import json import logging -from datetime import date, timedelta -from types import SimpleNamespace +from collections.abc import Callable +from datetime import date +from typing import Any, cast from unittest.mock import AsyncMock import httpx @@ -42,10 +45,12 @@ class DummyMCP: def __init__(self) -> None: - self.tools: dict[str, object] = {} + self.tools: dict[str, Callable[..., Any]] = {} def tool(self, name: str, description: str): - def decorator(func): + del description + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: self.tools[name] = func return func @@ -91,9 +96,9 @@ async def execute(self, query): _KR_SYNC_HINT = "uv run python scripts/sync_kr_symbol_universe.py" -def build_tools() -> dict[str, object]: +def build_tools() -> dict[str, Callable[..., Any]]: mcp = DummyMCP() - register_all_tools(mcp) + register_all_tools(cast(Any, mcp)) return mcp.tools @@ -171,7 +176,10 @@ def _patch_httpx_async_client( monkeypatch.setattr(module.httpx, "AsyncClient", async_client_class) -def _patch_yf_ticker(monkeypatch: pytest.MonkeyPatch, ticker_factory: object) -> None: +def _patch_yf_ticker( + monkeypatch: pytest.MonkeyPatch, + ticker_factory: Callable[[str], object], +) -> None: def wrapped_ticker(symbol, session=None): assert session is not None return ticker_factory(symbol) @@ -1419,427 +1427,83 @@ async def test_get_ohlcv_crypto_period_1h(monkeypatch): @pytest.mark.asyncio -async def test_get_ohlcv_kr_equity_period_1h(monkeypatch): +async def test_get_ohlcv_kr_equity_period_1h_includes_session_and_venues(monkeypatch): tools = build_tools() - df = _single_row_df() - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) - route_mock = AsyncMock(return_value="UN") - monkeypatch.setattr(market_data_quotes, "_resolve_kr_intraday_route", route_mock) - - class DummyKISClient: - async def inquire_time_dailychartprice( - self, code, market, n, end_date=None, end_time=None - ): - del code, market, n, end_date, end_time - return df + df = pd.DataFrame( + [ + { + "datetime": pd.Timestamp("2026-02-23 09:00:00"), + "date": date(2026, 2, 23), + "time": datetime.time(9, 0, 0), + "open": 100.0, + "high": 110.0, + "low": 90.0, + "close": 105.0, + "volume": 1000, + "value": 105000.0, + "session": "REGULAR", + "venues": ["KRX", "NTX"], + } + ] + ) + read_mock = AsyncMock(return_value=df) + monkeypatch.setattr(market_data_quotes, "read_kr_hourly_candles_1h", read_mock) - _patch_runtime_attr(monkeypatch, "KISClient", DummyKISClient) result = await tools["get_ohlcv"]("005930", market="kr", count=50, period="1h") + read_mock.assert_awaited_once_with(symbol="005930", count=50, end_date=None) assert result["instrument_type"] == "equity_kr" assert result["period"] == "1h" assert result["source"] == "kis" - route_mock.assert_awaited_once_with("005930") - - -@pytest.mark.asyncio -async def test_get_ohlcv_kr_equity_period_1h_backfills_multiple_days(monkeypatch): - tools = build_tools() - target_day = date(2026, 2, 19) - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) - route_mock = AsyncMock(return_value="UN") - monkeypatch.setattr(market_data_quotes, "_resolve_kr_intraday_route", route_mock) - calls: list[tuple[date | None, str | None, str]] = [] - - class DummyKISClient: - async def inquire_time_dailychartprice( - self, code, market, n, end_date=None, end_time=None - ): - del code - calls.append((end_date, end_time, market)) - requested_day = end_date or target_day - assert n >= 1 - return pd.DataFrame( - [ - { - "datetime": pd.Timestamp(f"{requested_day} 09:00:00"), - "date": requested_day, - "time": pd.Timestamp("2026-01-01 09:00:00").time(), - "open": 100.0, - "high": 101.0, - "low": 99.0, - "close": 100.5, - "volume": 1000, - "value": 100500.0, - }, - { - "datetime": pd.Timestamp(f"{requested_day} 10:00:00"), - "date": requested_day, - "time": pd.Timestamp("2026-01-01 10:00:00").time(), - "open": 100.5, - "high": 102.0, - "low": 100.0, - "close": 101.5, - "volume": 1100, - "value": 111650.0, - }, - ] - ) - - _patch_runtime_attr(monkeypatch, "KISClient", DummyKISClient) - result = await tools["get_ohlcv"]( - "005930", - market="kr", - count=4, - period="1h", - end_date=target_day.isoformat(), - ) - - assert len(calls) >= 3 - assert calls[0][2] == "UN" - assert calls[0][0] == target_day - assert calls[1][0] == target_day - assert calls[0][1] == "200000" - assert calls[1][1] is not None - assert calls[1][1] < calls[0][1] - assert any( - day == target_day - timedelta(days=1) and end_time == "200000" - for day, end_time, _ in calls - ) - assert len(result["rows"]) == 4 - assert result["period"] == "1h" - assert result["instrument_type"] == "equity_kr" - route_mock.assert_awaited_once_with("005930") - - -@pytest.mark.asyncio -async def test_get_ohlcv_kr_equity_period_1h_backfill_uses_j_close_for_history( - monkeypatch, -): - tools = build_tools() - target_day = date(2026, 2, 19) - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) - monkeypatch.setattr( - market_data_quotes, - "_resolve_kr_intraday_route", - AsyncMock(return_value="J"), - ) - calls: list[tuple[date | None, str | None, str]] = [] - - class DummyKISClient: - async def inquire_time_dailychartprice( - self, code, market, n, end_date=None, end_time=None - ): - del code - calls.append((end_date, end_time, market)) - requested_day = end_date or target_day - assert n >= 1 - return pd.DataFrame( - [ - { - "datetime": pd.Timestamp(f"{requested_day} 09:00:00"), - "date": requested_day, - "time": pd.Timestamp("2026-01-01 09:00:00").time(), - "open": 100.0, - "high": 101.0, - "low": 99.0, - "close": 100.5, - "volume": 1000, - "value": 100500.0, - }, - { - "datetime": pd.Timestamp(f"{requested_day} 10:00:00"), - "date": requested_day, - "time": pd.Timestamp("2026-01-01 10:00:00").time(), - "open": 100.5, - "high": 102.0, - "low": 100.0, - "close": 101.5, - "volume": 1100, - "value": 111650.0, - }, - ] - ) - - _patch_runtime_attr(monkeypatch, "KISClient", DummyKISClient) - result = await tools["get_ohlcv"]( - "005930", - market="kr", - count=4, - period="1h", - end_date=target_day.isoformat(), - ) - - assert len(calls) >= 2 - assert calls[0][2] == "J" - assert calls[0][1] == "153000" - assert any( - day == target_day - timedelta(days=1) and end_time == "153000" - for day, end_time, _ in calls - ) - assert len(result["rows"]) == 4 - assert result["period"] == "1h" - assert result["instrument_type"] == "equity_kr" - - -@pytest.mark.asyncio -async def test_get_ohlcv_kr_1h_paginates_within_same_day(monkeypatch): - tools = build_tools() - target_day = date(2026, 2, 19) - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) - monkeypatch.setattr( - market_data_quotes, - "_resolve_kr_intraday_route", - AsyncMock(return_value="UN"), - ) - calls: list[tuple[date | None, str | None, str]] = [] - - class DummyKISClient: - async def inquire_time_dailychartprice( - self, code, market, n, end_date=None, end_time=None - ): - del code, n - calls.append((end_date, end_time, market)) - requested_day = end_date or target_day - if end_time is None or end_time >= "180000": - hours = (19, 18) - else: - hours = (17, 16) - return pd.DataFrame( - [ - { - "datetime": pd.Timestamp(f"{requested_day} {hour:02d}:00:00"), - "date": requested_day, - "time": pd.Timestamp(f"2026-01-01 {hour:02d}:00:00").time(), - "open": float(hour), - "high": float(hour) + 0.5, - "low": float(hour) - 0.5, - "close": float(hour) + 0.25, - "volume": 1000 + hour, - "value": float((1000 + hour) * hour), - } - for hour in hours - ] - ) - - _patch_runtime_attr(monkeypatch, "KISClient", DummyKISClient) - result = await tools["get_ohlcv"]( - "005930", - market="kr", - count=4, - period="1h", - end_date=target_day.isoformat(), - ) - - assert len(calls) >= 2 - assert len(result["rows"]) == 4 - assert all(day == target_day for day, _, _ in calls) - assert all(market == "UN" for _, _, market in calls) - assert calls[0][1] == "200000" - assert calls[1][1] is not None - assert calls[1][1] < calls[0][1] - - -@pytest.mark.asyncio -async def test_get_ohlcv_kr_1h_stops_when_cursor_stalls(monkeypatch): - tools = build_tools() - target_day = date(2026, 2, 19) - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) - monkeypatch.setattr( - market_data_quotes, - "_resolve_kr_intraday_route", - AsyncMock(return_value="UN"), - ) - calls: list[tuple[date | None, str | None, str]] = [] - - class DummyKISClient: - async def inquire_time_dailychartprice( - self, code, market, n, end_date=None, end_time=None - ): - del code, n - calls.append((end_date, end_time, market)) - requested_day = end_date or target_day - if requested_day != target_day: - return pd.DataFrame() - return pd.DataFrame( - [ - { - "datetime": pd.Timestamp(f"{requested_day} 10:00:00"), - "date": requested_day, - "time": pd.Timestamp("2026-01-01 10:00:00").time(), - "open": 100.0, - "high": 101.0, - "low": 99.0, - "close": 100.5, - "volume": 1000, - "value": 100500.0, - }, - { - "datetime": pd.Timestamp(f"{requested_day} 09:00:00"), - "date": requested_day, - "time": pd.Timestamp("2026-01-01 09:00:00").time(), - "open": 99.5, - "high": 100.5, - "low": 99.0, - "close": 100.0, - "volume": 900, - "value": 90000.0, - }, - ] - ) - - _patch_runtime_attr(monkeypatch, "KISClient", DummyKISClient) - result = await tools["get_ohlcv"]( - "005930", - market="kr", - count=6, - period="1h", - end_date=target_day.isoformat(), - ) - - same_day_calls = [call for call in calls if call[0] == target_day] - assert len(same_day_calls) == 2 - assert same_day_calls[0][1] == "200000" - assert same_day_calls[1][1] is not None - assert same_day_calls[1][1] < same_day_calls[0][1] - assert len(result["rows"]) == 2 + assert result["rows"] + row = result["rows"][0] + assert row["session"] == "REGULAR" + assert row["venues"] == ["KRX", "NTX"] @pytest.mark.asyncio -async def test_get_ohlcv_kr_1h_respects_daily_page_limit(monkeypatch): +async def test_get_ohlcv_kr_1h_does_not_use_kis_ohlcv_cache(monkeypatch): tools = build_tools() - target_day = date(2026, 2, 19) - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) - monkeypatch.setattr( - market_data_quotes, - "_resolve_kr_intraday_route", - AsyncMock(return_value="UN"), - ) - daily_call_counts: dict[date, int] = {} - - class DummyKISClient: - async def inquire_time_dailychartprice( - self, code, market, n, end_date=None, end_time=None - ): - del code, market, n, end_time - requested_day = end_date or target_day - index = daily_call_counts.get(requested_day, 0) - daily_call_counts[requested_day] = index + 1 - hour = 20 - index - return pd.DataFrame( - [ - { - "datetime": pd.Timestamp(f"{requested_day} {hour:02d}:00:00"), - "date": requested_day, - "time": pd.Timestamp(f"2026-01-01 {hour:02d}:00:00").time(), - "open": 100.0, - "high": 101.0, - "low": 99.0, - "close": 100.5, - "volume": 1000, - "value": 100500.0, - } - ] - ) - - _patch_runtime_attr(monkeypatch, "KISClient", DummyKISClient) - result = await tools["get_ohlcv"]( - "005930", - market="kr", - count=25, - period="1h", - end_date=target_day.isoformat(), - ) - - assert daily_call_counts - assert max(daily_call_counts.values()) == 10 - assert all(count <= 10 for count in daily_call_counts.values()) - assert len(result["rows"]) == 25 - - -@pytest.mark.asyncio -async def test_resolve_kr_intraday_route_universe_empty_has_sync_hint(monkeypatch): - route_db = _DummyRouteDB([None, None]) - monkeypatch.setattr( - market_data_quotes, - "AsyncSessionLocal", - lambda: DummySessionManager(route_db), + df = pd.DataFrame( + [ + { + "datetime": pd.Timestamp("2026-02-23 09:00:00"), + "date": date(2026, 2, 23), + "time": datetime.time(9, 0, 0), + "open": 100.0, + "high": 110.0, + "low": 90.0, + "close": 105.0, + "volume": 1000, + "value": 105000.0, + "session": "REGULAR", + "venues": ["KRX"], + } + ] ) - - with pytest.raises(ValueError) as exc_info: - await market_data_quotes._resolve_kr_intraday_route("005930") - - assert route_db.calls == 2 - assert "kr_symbol_universe is empty" in str(exc_info.value) - assert _KR_SYNC_HINT in str(exc_info.value) - - -@pytest.mark.asyncio -async def test_resolve_kr_intraday_route_unregistered_has_sync_hint(monkeypatch): - route_db = _DummyRouteDB([None, "005930"]) monkeypatch.setattr( - market_data_quotes, - "AsyncSessionLocal", - lambda: DummySessionManager(route_db), + market_data_quotes.kis_ohlcv_cache, + "get_candles", + AsyncMock(side_effect=AssertionError("KR 1h must not use kis_ohlcv_cache")), ) - - with pytest.raises(ValueError) as exc_info: - await market_data_quotes._resolve_kr_intraday_route("005930") - - assert route_db.calls == 2 - assert "is not registered in kr_symbol_universe" in str(exc_info.value) - assert _KR_SYNC_HINT in str(exc_info.value) - - -@pytest.mark.asyncio -async def test_resolve_kr_intraday_route_inactive_has_sync_hint(monkeypatch): - route_db = _DummyRouteDB([SimpleNamespace(is_active=False, nxt_eligible=True)]) monkeypatch.setattr( market_data_quotes, - "AsyncSessionLocal", - lambda: DummySessionManager(route_db), - ) - - with pytest.raises(ValueError) as exc_info: - await market_data_quotes._resolve_kr_intraday_route("005930") - - assert route_db.calls == 1 - assert "is inactive in kr_symbol_universe" in str(exc_info.value) - assert _KR_SYNC_HINT in str(exc_info.value) - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - ("nxt_eligible", "expected_route"), - ((True, "UN"), (False, "J")), -) -async def test_resolve_kr_intraday_route_returns_expected_market( - monkeypatch, - nxt_eligible, - expected_route, -): - route_db = _DummyRouteDB( - [SimpleNamespace(is_active=True, nxt_eligible=nxt_eligible)] - ) - monkeypatch.setattr( - market_data_quotes, - "AsyncSessionLocal", - lambda: DummySessionManager(route_db), + "read_kr_hourly_candles_1h", + AsyncMock(return_value=df), ) - route = await market_data_quotes._resolve_kr_intraday_route("005930") + result = await tools["get_ohlcv"]("005930", market="kr", count=1, period="1h") - assert route == expected_route - assert route_db.calls == 1 + assert result["period"] == "1h" + assert result["instrument_type"] == "equity_kr" @pytest.mark.asyncio async def test_get_ohlcv_kr_1h_universe_empty_returns_error_payload(monkeypatch): tools = build_tools() - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) monkeypatch.setattr( market_data_quotes, - "_resolve_kr_intraday_route", + "read_kr_hourly_candles_1h", AsyncMock( side_effect=ValueError( f"kr_symbol_universe is empty. Sync required: {_KR_SYNC_HINT}" @@ -1858,10 +1522,9 @@ async def test_get_ohlcv_kr_1h_universe_empty_returns_error_payload(monkeypatch) @pytest.mark.asyncio async def test_get_ohlcv_kr_1h_unregistered_symbol_returns_error_payload(monkeypatch): tools = build_tools() - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) monkeypatch.setattr( market_data_quotes, - "_resolve_kr_intraday_route", + "read_kr_hourly_candles_1h", AsyncMock( side_effect=ValueError( "KR symbol '005930' is not registered in kr_symbol_universe. " @@ -1881,10 +1544,9 @@ async def test_get_ohlcv_kr_1h_unregistered_symbol_returns_error_payload(monkeyp @pytest.mark.asyncio async def test_get_ohlcv_kr_1h_inactive_symbol_returns_error_payload(monkeypatch): tools = build_tools() - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) monkeypatch.setattr( market_data_quotes, - "_resolve_kr_intraday_route", + "read_kr_hourly_candles_1h", AsyncMock( side_effect=ValueError( "KR symbol '005930' is inactive in kr_symbol_universe. " @@ -1901,63 +1563,6 @@ async def test_get_ohlcv_kr_1h_inactive_symbol_returns_error_payload(monkeypatch assert _KR_SYNC_HINT in result["error"] -@pytest.mark.asyncio -async def test_get_ohlcv_kr_1h_mock_unsupported_returns_error_payload(monkeypatch): - tools = build_tools() - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) - monkeypatch.setattr( - market_data_quotes, - "_resolve_kr_intraday_route", - AsyncMock(return_value="UN"), - ) - - class DummyKISClient: - async def inquire_time_dailychartprice( - self, code, market, n, end_date=None, end_time=None - ): - del code, market, n, end_date, end_time - raise RuntimeError( - "mock trading does not support inquire-time-dailychartprice" - ) - - _patch_runtime_attr(monkeypatch, "KISClient", DummyKISClient) - result = await tools["get_ohlcv"]("005930", market="kr", period="1h") - - assert result["source"] == "kis" - assert result["instrument_type"] == "equity_kr" - assert "mock" in result["error"].lower() - - -@pytest.mark.asyncio -async def test_get_ohlcv_kr_1h_opsq2001_field_missing_returns_error_payload( - monkeypatch, -): - tools = build_tools() - monkeypatch.setattr(settings, "kis_ohlcv_cache_enabled", False, raising=False) - monkeypatch.setattr( - market_data_quotes, - "_resolve_kr_intraday_route", - AsyncMock(return_value="UN"), - ) - - class DummyKISClient: - async def inquire_time_dailychartprice( - self, code, market, n, end_date=None, end_time=None - ): - del code, market, n, end_date, end_time - raise RuntimeError( - "OPSQ2001 ERROR INPUT FIELD NOT FOUND [FID_FAKE_TICK_INCU_YN]" - ) - - _patch_runtime_attr(monkeypatch, "KISClient", DummyKISClient) - result = await tools["get_ohlcv"]("005930", market="kr", period="1h") - - assert result["source"] == "kis" - assert result["instrument_type"] == "equity_kr" - assert "OPSQ2001" in result["error"] - assert "FID_FAKE_TICK_INCU_YN" in result["error"] - - @pytest.mark.asyncio async def test_get_ohlcv_with_end_date(monkeypatch): tools = build_tools() @@ -3485,8 +3090,16 @@ def test_bullish_divergence_detected(self): n = 30 close = pd.Series([100.0] * n) volume = pd.Series([1000.0] * n) - close.iloc[-5:] = [100, 98, 96, 98, 95] - volume.iloc[-5:] = [1000, 1000, 1000, 10000, 1000] + close.iloc[-5:] = pd.Series( + [100.0, 98.0, 96.0, 98.0, 95.0], + index=close.iloc[-5:].index, + dtype=float, + ) + volume.iloc[-5:] = pd.Series( + [1000.0, 1000.0, 1000.0, 10000.0, 1000.0], + index=volume.iloc[-5:].index, + dtype=float, + ) result = market_data_indicators._calculate_obv(close, volume) @@ -3496,8 +3109,16 @@ def test_bearish_divergence_detected(self): n = 30 close = pd.Series([95.0] * n) volume = pd.Series([1000.0] * n) - close.iloc[-5:] = [95, 97, 99, 97, 100] - volume.iloc[-5:] = [1000, 1000, 1000, 10000, 1000] + close.iloc[-5:] = pd.Series( + [95.0, 97.0, 99.0, 97.0, 100.0], + index=close.iloc[-5:].index, + dtype=float, + ) + volume.iloc[-5:] = pd.Series( + [1000.0, 1000.0, 1000.0, 10000.0, 1000.0], + index=volume.iloc[-5:].index, + dtype=float, + ) result = market_data_indicators._calculate_obv(close, volume) @@ -3648,7 +3269,7 @@ def test_computes_multiple_indicators(self): def test_computes_all_indicators(self): df = _sample_ohlcv_df(250) - all_indicators = [ + all_indicators: list[market_data_indicators.IndicatorType] = [ "sma", "ema", "rsi", @@ -4757,11 +4378,13 @@ def __init__(self, json_data, status_code=200): def raise_for_status(self): if self.status_code >= 400: - raise httpx.HTTPStatusError( - "error", - request=None, - response=self, # type: ignore[arg-type] + request = httpx.Request("GET", "https://example.invalid") + response = httpx.Response( + status_code=self.status_code, + request=request, + json=self._json_data, ) + raise httpx.HTTPStatusError("error", request=request, response=response) def json(self): return self._json_data @@ -4791,16 +4414,23 @@ async def fake_get(self_cli, url, **kwargs): def _patch_yfinance(self, monkeypatch, last_price=5500.0, prev_close=5450.0): """Patch yfinance for US index.""" + @dataclasses.dataclass(frozen=True) class MockFastInfo: - pass - - info = MockFastInfo() - info.last_price = last_price - info.regular_market_previous_close = prev_close - info.open = 5460.0 - info.day_high = 5510.0 - info.day_low = 5430.0 - info.last_volume = 3_500_000_000 + last_price: float + regular_market_previous_close: float + open: float + day_high: float + day_low: float + last_volume: int + + info = MockFastInfo( + last_price=last_price, + regular_market_previous_close=prev_close, + open=5460.0, + day_high=5510.0, + day_low=5430.0, + last_volume=3_500_000_000, + ) class MockTicker: fast_info = info @@ -7494,7 +7124,7 @@ async def test_place_order_nyse_exchange_code(monkeypatch): """Test that NYSE stocks (e.g. TSM) use correct exchange code instead of hardcoded NASD.""" tools = build_tools() - buy_calls: list[dict] = [] + buy_calls: list[dict[str, object]] = [] class MockKISClient: async def inquire_integrated_margin(self): @@ -7610,12 +7240,6 @@ def test_none_rsi_returns_equal_weights(self): class TestPlaceOrderHighAmount: """Tests for place_order with high-amount orders.""" - @staticmethod - def build_tools(): - mcp = DummyMCP() - register_all_tools(mcp) - return mcp.tools - @pytest.mark.asyncio async def test_get_current_price_for_order_crypto_bypasses_ticker_cache( self, monkeypatch