Skip to content
Open
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: help install install-dev test test-unit test-integration test-cov test-fast test-watch lint format typecheck security clean dev taskiq-worker taskiq-scheduler docker-build docker-run docker-test sync-kr-symbol-universe sync-upbit-symbol-universe sync-us-symbol-universe
.PHONY: help install install-dev test test-unit test-integration test-cov test-fast test-watch lint format typecheck security clean dev taskiq-worker taskiq-scheduler docker-build docker-run docker-test sync-kr-symbol-universe sync-upbit-symbol-universe sync-us-symbol-universe sync-kr-candles-backfill sync-kr-candles-incremental

help: ## Show this help message
@echo "Available commands:"
Expand Down Expand Up @@ -72,6 +72,12 @@ sync-upbit-symbol-universe: ## Sync Upbit symbol universe for crypto symbol reso
sync-us-symbol-universe: ## Sync US symbol universe for US symbol/exchange resolution
uv run python scripts/sync_us_symbol_universe.py

sync-kr-candles-backfill: ## Backfill KR candles for recent sessions
uv run python scripts/sync_kr_candles.py --mode backfill --sessions 10

sync-kr-candles-incremental: ## Incremental KR candles sync (venue-gated)
uv run python scripts/sync_kr_candles.py --mode incremental

docker-build: ## Build Docker image
docker build -t auto-trader .

Expand Down
211 changes: 211 additions & 0 deletions alembic/versions/87541fdbc954_add_kr_candles_timescale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
"""add_kr_candles_timescale

Revision ID: 87541fdbc954
Revises: 9f2c6db7a41e
Create Date: 2026-02-23 15:14:41.031755

"""

from collections.abc import Sequence

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "87541fdbc954"
down_revision: str | Sequence[str] | None = "9f2c6db7a41e"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
"""Upgrade schema."""
op.execute(
"""
DO $$
DECLARE
v_extversion TEXT;
v_version_core TEXT;
v_parts TEXT[];
v_major INTEGER;
v_minor INTEGER;
v_patch INTEGER;
BEGIN
SELECT extversion
INTO v_extversion
FROM pg_extension
WHERE extname = 'timescaledb';

IF v_extversion IS NULL THEN
RAISE EXCEPTION 'timescaledb extension is not installed';
END IF;

v_version_core := split_part(v_extversion, '-', 1);
v_parts := regexp_split_to_array(v_version_core, '\\.');

v_major := COALESCE(v_parts[1], '0')::INTEGER;
v_minor := COALESCE(v_parts[2], '0')::INTEGER;
v_patch := COALESCE(v_parts[3], '0')::INTEGER;

IF (v_major, v_minor, v_patch) < (2, 8, 1) THEN
RAISE EXCEPTION
'timescaledb extension version % is below required minimum 2.8.1',
v_extversion;
END IF;
END
$$
"""
)

op.execute(
"""
CREATE TABLE public.kr_candles_1m (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
venue TEXT NOT NULL,
open NUMERIC NOT NULL,
high NUMERIC NOT NULL,
low NUMERIC NOT NULL,
close NUMERIC NOT NULL,
volume NUMERIC NOT NULL,
value NUMERIC NOT NULL,
CONSTRAINT ck_kr_candles_1m_venue CHECK (venue IN ('KRX', 'NTX')),
CONSTRAINT uq_kr_candles_1m_time_symbol_venue UNIQUE (time, symbol, venue)
)
"""
)

op.execute(
"""
SELECT create_hypertable(
'public.kr_candles_1m',
'time',
migrate_data => TRUE
)
"""
)

op.execute(
"""
CREATE INDEX ix_kr_candles_1m_symbol_time_desc
ON public.kr_candles_1m (symbol, time DESC)
"""
)

op.execute(
"""
CREATE MATERIALIZED VIEW public.kr_candles_1h
WITH (
timescaledb.continuous,
timescaledb.materialized_only = false
)
AS
SELECT
time_bucket(INTERVAL '1 hour', time, 'Asia/Seoul') AS bucket,
symbol,
FIRST(
open,
((extract(epoch from time) * 1000000)::bigint * 2
+ CASE WHEN venue = 'KRX' THEN 0 ELSE 1 END)
) AS open,
MAX(high) AS high,
MIN(low) AS low,
LAST(
close,
((extract(epoch from time) * 1000000)::bigint * 2
+ CASE WHEN venue = 'KRX' THEN 1 ELSE 0 END)
) AS close,
SUM(volume) AS volume,
SUM(value) AS value,
array_agg(DISTINCT venue ORDER BY venue) AS venues
FROM public.kr_candles_1m
GROUP BY bucket, symbol
WITH NO DATA
"""
)

op.execute(
"""
DO $$
BEGIN
IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN
EXECUTE $sql$
SELECT remove_continuous_aggregate_policy(
'public.kr_candles_1h',
if_exists => TRUE
)
$sql$;

EXECUTE $sql$
SELECT add_continuous_aggregate_policy(
'public.kr_candles_1h',
start_offset => INTERVAL '2 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '5 minutes'
)
$sql$;
END IF;
END
$$
"""
)

op.execute(
"""
DO $$
DECLARE
v_start TIMESTAMPTZ;
v_end TIMESTAMPTZ;
v_refresh_end TIMESTAMPTZ;
BEGIN
IF to_regclass('public.kr_candles_1h') IS NULL THEN
RETURN;
END IF;

SELECT MIN(time), MAX(time)
INTO v_start, v_end
FROM public.kr_candles_1m;

IF v_start IS NOT NULL AND v_end IS NOT NULL THEN
v_refresh_end := LEAST(
v_end + INTERVAL '1 hour',
date_trunc('hour', now() - INTERVAL '1 hour')
);

IF v_refresh_end <= v_start THEN
RETURN;
END IF;

CALL refresh_continuous_aggregate(
'public.kr_candles_1h',
v_start,
v_refresh_end
);
END IF;
END
$$
"""
)


def downgrade() -> None:
"""Downgrade schema."""
op.execute(
"""
DO $$
BEGIN
IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN
EXECUTE $sql$
SELECT remove_continuous_aggregate_policy(
'public.kr_candles_1h',
if_exists => TRUE
)
$sql$;
END IF;
END
$$
"""
)

op.execute("DROP MATERIALIZED VIEW IF EXISTS public.kr_candles_1h")
op.execute("DROP INDEX IF EXISTS public.ix_kr_candles_1m_symbol_time_desc")
op.execute("DROP TABLE IF EXISTS public.kr_candles_1m")
64 changes: 64 additions & 0 deletions alembic/versions/d31f0a2b4c6d_add_kr_candles_retention_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from collections.abc import Sequence

from alembic import op

revision: str = "d31f0a2b4c6d"
down_revision: str | Sequence[str] | None = "87541fdbc954"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.execute(
"""
DO $$
BEGIN
IF to_regclass('public.kr_candles_1m') IS NOT NULL THEN
PERFORM remove_retention_policy(
'public.kr_candles_1m',
if_exists => TRUE
);
PERFORM add_retention_policy(
'public.kr_candles_1m',
INTERVAL '90 days'
);
END IF;

IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN
PERFORM remove_retention_policy(
'public.kr_candles_1h',
if_exists => TRUE
);
PERFORM add_retention_policy(
'public.kr_candles_1h',
INTERVAL '90 days'
);
END IF;
END
$$
"""
)


def downgrade() -> None:
op.execute(
"""
DO $$
BEGIN
IF to_regclass('public.kr_candles_1m') IS NOT NULL THEN
PERFORM remove_retention_policy(
'public.kr_candles_1m',
if_exists => TRUE
);
END IF;

IF to_regclass('public.kr_candles_1h') IS NOT NULL THEN
PERFORM remove_retention_policy(
'public.kr_candles_1h',
if_exists => TRUE
);
END IF;
END
$$
"""
)
28 changes: 28 additions & 0 deletions app/jobs/kr_candles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

import logging

from app.services.kr_candles_sync_service import sync_kr_candles

logger = logging.getLogger(__name__)


async def run_kr_candles_sync(
*,
mode: str,
sessions: int = 10,
user_id: int = 1,
) -> dict[str, object]:
try:
result = await sync_kr_candles(mode=mode, sessions=sessions, user_id=user_id)
return {
"status": "completed",
**result,
}
except Exception as exc:
logger.error("KR candles sync failed: %s", exc, exc_info=True)
return {
"status": "failed",
"mode": mode,
"error": str(exc),
}
21 changes: 13 additions & 8 deletions app/mcp_server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ MCP tools (market data, portfolio, order execution) exposed via `fastmcp`.
- `4h`: crypto only
- `1h`: KR/US equity + crypto
- US OHLCV remains Yahoo-based (`app.services.brokers.yahoo.client.fetch_ohlcv`)
- KR `1h` includes in-progress (partial) hourly candle
- KR `1h` resolves symbol route from `kr_symbol_universe` (`nxt_eligible=true` -> `UN`, else `J`)
- KR `1h` returns an explicit error when symbol is missing/inactive in `kr_symbol_universe`
- KR `1h` keeps `UN` empty-result behavior without `J` fallback
- KR `1h` expands intraday coverage with same-day `end_time` cursor pagination before moving to prior dates
- KR `1h` same-day pagination is capped at `10` internal API calls per trading day
- KR `1h` historical backfill uses route-specific cutoff (`J=153000`, `UN=200000`)
- KR `1h` prerequisite: run `make sync-kr-symbol-universe` (or `uv run python scripts/sync_kr_symbol_universe.py`) right after migrations
- KR `1h` history is DB-first from Timescale continuous aggregate `public.kr_candles_1h`
- KR `1h` includes the in-progress (partial) hourly candle by rebuilding the current hour in-memory from:
- `public.kr_candles_1m` (minute DB) + KIS minute API (up to 30 rows)
- KIS minute venues are merged with strict dedup to prevent double-counting (API overwrites DB per minute+venue)
- KIS minute API call plan (KST):
- `09:00 <= now < 15:35`: call KRX (`J`) + NTX (`NX`) in parallel when `nxt_eligible=true` (15:35 delay defense)
- `08:00 <= now < 09:00`: call NTX (`NX`) only when `nxt_eligible=true`
- `15:35 <= now < 20:00`: call NTX (`NX`) only when `nxt_eligible=true`
- When `end_date` is in the past: DB-only (0 API calls)
- KR `1h` returns an explicit error when symbol is missing/inactive in `kr_symbol_universe` (used for `nxt_eligible`)
- KR `1h` does not use Redis OHLCV cache (`kis_ohlcv_cache`)
- KR `1h` treats partial API failure (KRX/NTX) as a tool-level error (exception)
- KR `1h` response rows add `session` and `venues` fields (KR `1h` only; other market/period schemas unchanged)
- `get_indicators(symbol, indicators, market=None)`
- `get_volume_profile(symbol, market=None, period=60, bins=20)`
- `get_order_history(symbol=None, status="all", order_id=None, limit=50)`
Expand Down
Loading