feat: add KR OHLCV ingest pipeline with scheduled precompute#181
feat: add KR OHLCV ingest pipeline with scheduled precompute#181robin-watcha wants to merge 1 commit intomainfrom
Conversation
📝 WalkthroughWalkthroughIntroduces a comprehensive Korean OHLCV (Open-High-Low-Close-Volume) time-series infrastructure built on TimescaleDB. Includes three database migrations establishing tables and materialized views, new services for data normalization and storage, precomputation jobs with incremental and nightly scheduling, configuration flags for dual-route handling, and extensive test coverage for migrations, precomputation logic, and storage operations. Changes
Sequence Diagram(s)sequenceDiagram
participant Job as KR OHLCV<br/>Precompute Job
participant Store as kr_ohlcv<br/>timeseries_store
participant DB as TimescaleDB<br/>(market_candles_1m_kr)
participant CAgg as Continuous<br/>Aggregate<br/>(market_candles_1h_kr)
participant KIS as KIS<br/>API Client
Job->>Store: ensure_timescale_ready()
Store->>DB: Check extension + tables exist
DB-->>Store: OK
Job->>KIS: Fetch intraday data<br/>(symbol, route, day)
KIS-->>Job: OHLCV frame (minute bars)
Job->>Job: Aggregate & normalize<br/>minute data
Job->>Store: upsert_market_candles_1m()<br/>(symbol, frame, exchange, route)
Store->>Store: Validate & normalize
Store->>DB: INSERT/ON CONFLICT<br/>market_candles_1m_kr
DB-->>Store: Upsert stats
alt Data covers time range
Job->>Store: refresh_market_candles_1h_kr()<br/>(start_ts, end_ts)
Store->>CAgg: CALL refresh_continuous_aggregate()
CAgg->>DB: Re-aggregate 1h buckets<br/>from market_candles_1m_kr
DB-->>CAgg: Updated materialized view
end
Store-->>Job: Return stats (rows, min/max ts)
Job-->>Job: Return status dict
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: feat: add KR OHLCV ingest pipeline with scheduled precomputeOverall this is a well-structured addition with good separation of concerns (jobs/services/tasks layers), feature-flag gating for incremental rollout, and a solid suite of unit tests. Below are the issues I found, roughly ordered by severity. 🔴 Bugs / Correctness Issues1. Route/Exchange inconsistency: The second migration's fail-fast guard accepts routes 2. After the v2 migration 3. Time string comparison for pagination termination In next_end_time = (oldest - datetime.timedelta(minutes=1)).strftime("%H%M%S")
if next_end_time < session_start:
break
if next_end_time == end_time:
break
🟠 Design / Architecture Issues4. Three migrations instead of one: schema churn Migrations
5.
6. Per-symbol CAGG refresh calls
7. Millisecond offset hack for exchange priority in CAGG first(open, ts + CASE WHEN exchange = 'NXT' THEN INTERVAL '1 millisecond' ELSE INTERVAL '0' END) AS openThis relies on no two candles from the same symbol being within 1ms of each other. Since candles are floored to the minute in 🟡 Code Quality Issues8. Double exception handling in task wrapper (dead code)
9. In-memory metrics not usable in multi-worker deployments
10. holdings = await KISClient().fetch_my_stocks()If 11. NXT post-close offset of 4h30m is very wide
12. In if active_users:
holdings = await KISClient().fetch_my_stocks()The list itself is unused. The intent seems to be "only call KIS if there are active users", but the check is redundant given that the loop over holdings is a no-op when empty. Either remove the 🟢 Positives
Summary
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #181 +/- ##
===========================================
+ Coverage 53.11% 64.62% +11.51%
===========================================
Files 82 177 +95
Lines 12530 22420 +9890
===========================================
+ Hits 6655 14489 +7834
- Misses 5875 7931 +2056 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
d6febd0 to
7178085
Compare
|
PR Review: KR OHLCV Ingest PipelineOverall, this is a well-structured foundation for the TimescaleDB OHLCV ingest system. The exchange-priority trick for the CAGG ( 🔴 Bugs / Correctness Issues1. The minute-level fallback reads from # kr_ohlcv_timeseries_store.py
minute_statement = text(
"""
SELECT close
FROM market_candles_1m_kr ← should be market_candles_1m_kr_v2
...
"""
)Once dual-write is disabled and data lives only in 2. # kr_ohlcv_timeseries_store.py
_VALID_ROUTES = {"J", "NX"} # ← "NXT" is absentThe migration validation check accepts 3. # jobs/kr_ohlcv_precompute.py _sync_symbol_minutes()
record_fetch_success(route_market) # ← called unconditionally
if merged.empty:
continueA non-trading day (or a day where the API returns nothing) will increment the success counter anyway. Move the metric call inside the 🟠 Migration / Schema Issues4. Legacy tables are never dropped After three migrations, the schema contains three overlapping tables:
Migrations 1 and 2's primary tables are superseded but not dropped. They waste storage and hypertable resources. If this is a fresh deployment (no production data yet), consider consolidating all three migrations into one. If not, add a follow-up migration that drops the unused tables after the cutover window. 5. Incomplete downgrade in migration 2 The downgrade for 🟡 Design / Quality Issues6. Config key naming convention is inconsistent New settings use SCREAMING_SNAKE_CASE while existing ones use lowercase: # Existing
kis_ohlcv_cache_max_days: int = 400
# New
KR_OHLCV_DUAL_ROUTE_ENABLED: bool = False
KR_OHLCV_V2_DUAL_WRITE_ENABLED: bool = FalsePydantic BaseSettings treats env vars case-insensitively, but attribute-level consistency matters for readability and grep-ability. Align to the project's existing lowercase convention. 7. async def _resolve_route(symbol: str) -> list[str] | None:
async with AsyncSessionLocal() as session:
row = (await session.execute(...)).scalar_one_or_none()With N symbols this creates N sequential round-trips. A single bulk query fetching all active symbols and their 8. active_users = list(
(await session.execute(select(User.id).where(...).order_by(User.id)))
.scalars().all()
)This is used only as a truthiness check later ( 9. In-memory metrics (
10. Redundant exception handling in task layer
11. # taskiq_broker.py WorkerInitMiddleware.startup
await ensure_timescale_ready() # no try/exceptA development environment without TimescaleDB will fail to start the worker entirely, even if the OHLCV feature is disabled by flags. Consider guarding with a feature flag or converting the crash to a warning with graceful degradation. 🟢 Positive Notes
Minor
|
There was a problem hiding this comment.
Actionable comments posted: 10
🧹 Nitpick comments (9)
app/services/kr_ohlcv_metrics.py (1)
10-10: Thread-safety note on globalCounter.
_counters[key] += valueis not atomic even under CPython's GIL (it's a read-modify-write across multiple bytecode ops). For a lightweight in-process metrics counter this is acceptable, but be aware that concurrent async tasks sharing a thread won't race (single-threaded event loop), while multi-threaded usage (e.g., from TaskIQ workers in thread pools) could lose increments. If precision matters later, considerthreading.Lockor anasyncio-aware accumulator.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/kr_ohlcv_metrics.py` at line 10, The global Counter _counters is not thread-safe because operations like _counters[key] += value are read-modify-write; protect all mutations by adding a lock (e.g., create _counters_lock = threading.Lock() at module scope) and wrap every increment/decrement/clear of _counters in with _counters_lock: ... to ensure atomic updates; if the module is used only in async single-threaded contexts prefer an asyncio.Lock and await it around mutations instead, but do not leave raw _counters[...] += ... unguarded.tests/test_kr_cagg_migration.py (1)
44-48:next(...)without a default will raiseStopIterationon mismatch.If the migration SQL changes and the assertion target string no longer matches,
next(stmt for stmt in statements if ...)on lines 44, 62, and 66 raisesStopIterationinstead of a clear test failure. Usingnext(..., None)with an explicitassertgives a better failure message.Example
- create_cagg_sql = next( - stmt - for stmt in statements - if "CREATE MATERIALIZED VIEW market_candles_1h_kr" in stmt - ) + create_cagg_sql = next( + (stmt for stmt in statements + if "CREATE MATERIALIZED VIEW market_candles_1h_kr" in stmt), + None, + ) + assert create_cagg_sql is not None, "Expected CREATE MATERIALIZED VIEW statement"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_kr_cagg_migration.py` around lines 44 - 48, The test uses next(...) without a default which raises StopIteration on mismatch; update each usage (e.g., the generator that assigns create_cagg_sql and the similar generators around lines where drop_cagg_sql and the replacement cagg are found) to call next(<generator>, None) and then add an explicit assert that the result is not None with a helpful message (e.g., "expected CREATE MATERIALIZED VIEW market_candles_1h_kr in statements") so the test fails with a clear assertion error instead of StopIteration.app/core/config.py (1)
49-52: Naming convention inconsistency with adjacent fields.The new fields use
UPPER_CASE(KR_OHLCV_DUAL_ROUTE_ENABLED, etc.) while the immediately preceding OHLCV-related fields uselower_case(kis_ohlcv_cache_enabled,upbit_ohlcv_cache_enabled, etc.). Sincecase_sensitive=Falseinmodel_config, this won't break env-var loading, but it creates an inconsistent API surface when accessingsettings.KR_OHLCV_DUAL_ROUTE_ENABLEDvs.settings.kis_ohlcv_cache_enabledin Python code.Also, per coding guidelines, ensure
.env.exampleor equivalent documentation is updated to include these new environment variables.As per coding guidelines: "Treat
app/core/config.pyas the source of truth for runtime settings and environment keys. Keep root documentation and config examples synchronized when adding or renaming settings inapp/core/config.py."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/core/config.py` around lines 49 - 52, Rename the new settings to follow the existing lower_snake_case convention used by neighboring fields (e.g., change KR_OHLCV_DUAL_ROUTE_ENABLED, KR_OHLCV_DUAL_ROUTE_CANARY_SYMBOLS, KR_OHLCV_DUAL_ROUTE_CANARY_ALL, KR_OHLCV_V2_DUAL_WRITE_ENABLED to kis_ohlcv_dual_route_enabled, kis_ohlcv_dual_route_canary_symbols, kis_ohlcv_dual_route_canary_all, kis_ohlcv_v2_dual_write_enabled or another consistent lower_snake_case prefix matching kis_/upbit_ names), update all references in the codebase to the new symbols (search for the four uppercase names), and add the corresponding environment variable keys and descriptions to .env.example or the project config docs so runtime environment docs stay in sync with app/core/config.py.app/core/taskiq_broker.py (1)
9-9: Service-specific import in core broker module.Importing
ensure_timescale_readyfromapp.services.kr_ohlcv_timeseries_storecreates a coupling between the core broker entrypoint and a service-specific module. If this import fails (e.g., missing dependency), all broker processes fail to start. Consider lazy-importing within the guarded block if the feature-flag approach is adopted.Based on learnings: "Do not add service-specific orchestration logic into core utility modules in
app/core/."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/core/taskiq_broker.py` at line 9, The top-level import of ensure_timescale_ready from app.services.kr_ohlcv_timeseries_store couples the core broker and can break broker startup if that service or its deps are missing; remove the module-level import and instead perform a lazy import inside the guarded startup path where ensure_timescale_ready is invoked (e.g., inside the broker startup/ready hook or the feature-flag branch), wrapping the import and call in try/except to log failures and continue so broker processes don’t crash; reference ensure_timescale_ready and the broker startup/ready function to locate where to move the import.alembic/versions/d2f4a8c1b9e3_add_kr_quarantine_and_bigint_v2.py (1)
62-103: Large data migration runs in a single transaction.The
INSERT INTO market_candles_1m_kr_v2 ... SELECT FROM market_candles_1m_krcan be extremely slow and hold a long-running transaction on large datasets, potentially causing lock contention and WAL bloat. Consider whether batching (e.g., via a PL/pgSQL loop withLIMIT/OFFSETor time-range windows) is warranted based on the expected row count.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@alembic/versions/d2f4a8c1b9e3_add_kr_quarantine_and_bigint_v2.py` around lines 62 - 103, The migration currently performs a single large INSERT ... SELECT via op.execute which can hold a long transaction; change it to migrate in batches by iterating over ranges (e.g., time windows or primary-key ranges) and performing repeated smaller INSERT INTO market_candles_1m_kr_v2 ... SELECT FROM market_candles_1m_kr operations (or use a PL/pgSQL loop with LIMIT/OFFSET) committing between batches to avoid long locks and WAL bloat; update the migration function that calls op.execute and implement a bounded-loop batching strategy using the ts (or PK) column to select slices, ensure each batch uses the same ROUND(...)::BIGINT transformations and ON CONFLICT handling, and test for idempotence and resumeability.app/services/kr_ohlcv_timeseries_store.py (1)
44-91: Extract the repeated empty-frame construction into a private helper.The same 7-column empty
pd.DataFrameis constructed in three separatereturnpaths (lines 46–56, 66–76, and 81–91). An_empty_minute_frame()helper (parallel to the existing_empty_hourly_frame) would eliminate the duplication.♻️ Proposed refactor
+_MINUTE_COLUMNS = ["datetime", "open", "high", "low", "close", "volume", "value"] + + +def _empty_minute_frame() -> pd.DataFrame: + return pd.DataFrame(columns=_MINUTE_COLUMNS) + + def _normalize_minute_frame(frame: pd.DataFrame) -> pd.DataFrame: if frame.empty: - return pd.DataFrame( - columns=[ - "datetime", - "open", - "high", - "low", - "close", - "volume", - "value", - ] - ) + return _empty_minute_frame() normalized = frame.copy() if "datetime" not in normalized.columns: if "date" in normalized.columns and "time" in normalized.columns: normalized["datetime"] = pd.to_datetime(...) else: - return pd.DataFrame(columns=["datetime", "open", "high", "low", "close", "volume", "value"]) + return _empty_minute_frame() ... normalized = normalized.dropna(subset=["datetime"]).copy() if normalized.empty: - return pd.DataFrame(columns=["datetime", "open", "high", "low", "close", "volume", "value"]) + return _empty_minute_frame()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/kr_ohlcv_timeseries_store.py` around lines 44 - 91, Extract the repeated construction of the 7-column empty DataFrame into a new private helper named _empty_minute_frame(), mirroring the pattern used by _empty_hourly_frame; implement _empty_minute_frame() to return a pd.DataFrame with columns ["datetime","open","high","low","close","volume","value"] and then replace the three inline return pd.DataFrame(...) occurrences inside _normalize_minute_frame with calls to _empty_minute_frame(); ensure _normalize_minute_frame still imports/uses pd and that behavior is unchanged.tests/test_kr_ohlcv_precompute.py (1)
102-121: Extract shared test helpers toconftest.pyto eliminate duplication.
_ScalarResult,_DummySession, and_DummySessionManagerare defined identically here (lines 102–121) and intests/test_kr_ohlcv_timeseries_store.py(lines 12–48). Move them totests/conftest.pyas fixtures or module-level helpers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_kr_ohlcv_precompute.py` around lines 102 - 121, The three duplicated test helper classes (_ScalarResult, _DummySession, _DummySessionManager) should be extracted into tests/conftest.py as shared helpers/fixtures; create module-level helpers (or async fixtures) that return the same behavior used here—_ScalarResult with scalar_one_or_none returning the stored value, _DummySession.execute returning an instance of _ScalarResult wrapping a Row-like object (with is_active and nxt_eligible), and _DummySessionManager implementing async __aenter__/__aexit__ to yield _DummySession—and update both tests (tests/test_kr_ohlcv_precompute.py and tests/test_kr_ohlcv_timeseries_store.py) to import or use those fixtures instead of redefining the classes.tests/test_kr_ohlcv_timeseries_store.py (1)
115-126: Missing test for the thirdensure_timescale_readycheck — CAGG absent.
ensure_timescale_readyraisesRuntimeError("market_candles_1h_kr continuous aggregate is missing")when the CAGG lookup returnsNone, but there is no test covering this path. Only the extension-missing and minute-table-missing paths are exercised.✅ Proposed additional test
`@pytest.mark.unit` `@pytest.mark.asyncio` async def test_ensure_timescale_ready_fails_when_cagg_missing(monkeypatch): monkeypatch.setattr(settings, "ENVIRONMENT", "production", raising=False) dummy = _DummySession(values=["timescaledb", "public.market_candles_1m_kr", None]) monkeypatch.setattr( kr_ohlcv_timeseries_store, "AsyncSessionLocal", lambda: _DummySessionManager(dummy), ) with pytest.raises(RuntimeError, match="market_candles_1h_kr continuous aggregate is missing"): await kr_ohlcv_timeseries_store.ensure_timescale_ready(allow_test_bypass=False)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_kr_ohlcv_timeseries_store.py` around lines 115 - 126, Add a unit test that covers the CAGG-missing branch in ensure_timescale_ready: create a test similar to the existing ones that sets settings.ENVIRONMENT to "production", uses _DummySession(values=["timescaledb", "public.market_candles_1m_kr", None]) and monkeypatches kr_ohlcv_timeseries_store.AsyncSessionLocal to return _DummySessionManager(dummy), then assert that await kr_ohlcv_timeseries_store.ensure_timescale_ready(allow_test_bypass=False) raises RuntimeError with message "market_candles_1h_kr continuous aggregate is missing". Ensure the test uses pytest.mark.asyncio (and pytest.mark.unit if you want consistency).app/tasks/kr_ohlcv_precompute_tasks.py (1)
14-51: Thetry/exceptblocks in both task wrappers are dead code.
run_kr_ohlcv_incremental_precomputeandrun_kr_ohlcv_nightly_precomputealready wrap their entire body in a broadexcept Exceptionand return a{"status": "failed", ...}dict — they never raise. The outertry/exceptin these task wrappers will never execute.Either remove the redundant guards, or (preferably) strip the inner
exceptfrom the job functions and let the task layer be the sole error boundary.♻️ Simplify task wrappers (option A — remove dead guard)
`@broker.task`( task_name="ohlcv.kr.precompute.incremental", schedule=[{"cron": "*/5 8-20 * * 1-5", "cron_offset": "Asia/Seoul"}], ) async def run_kr_ohlcv_incremental_precompute_task() -> dict[str, int | str]: - try: - return await run_kr_ohlcv_incremental_precompute() - except Exception as exc: - logger.error( - "TaskIQ KR OHLCV incremental precompute failed: %s", - exc, - exc_info=True, - ) - return { - "status": "failed", - "mode": "incremental", - "error": str(exc), - } + return await run_kr_ohlcv_incremental_precompute()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/tasks/kr_ohlcv_precompute_tasks.py` around lines 14 - 51, The outer try/except in the task wrappers run_kr_ohlcv_incremental_precompute_task and run_kr_ohlcv_nightly_precompute_task is dead because run_kr_ohlcv_incremental_precompute and run_kr_ohlcv_nightly_precompute already catch exceptions and return failure dicts; either remove the outer try/except blocks from the task functions to simplify them, or (preferred) remove the inner broad except handlers inside run_kr_ohlcv_incremental_precompute and run_kr_ohlcv_nightly_precompute so the task wrappers become the single error boundary that logs exceptions and returns the failure dicts (update only those functions named above to keep behavior consistent).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@alembic/versions/c4d9e2a8f7b1_add_timescale_kr_ohlcv_tables.py`:
- Around line 14-31: The migration creates market_candles_1m with PRIMARY KEY
(market, symbol, ts) but omits the route column, causing rows from different
routes/exchanges with the same timestamp to be dropped; update the migration to
include route (or exchange) in the primary key (i.e., PRIMARY KEY (market,
symbol, route, ts)) or add a composite unique constraint that includes route so
multi-route data is preserved—modify the CREATE TABLE statement for
market_candles_1m in the migration (and any related index/constraint statements)
to reference the route column in the PK/unique key to avoid silent data loss.
In
`@alembic/versions/c9e4f5b8a2d1_rebuild_kr_hour_cagg_with_exchange_priority.py`:
- Around line 184-240: The downgrade() currently recreates the continuous
aggregate but does not remove the backing table created in upgrade
(market_candles_1m_kr); update downgrade() to either drop the table explicitly
(e.g., issue a DROP TABLE IF EXISTS for market_candles_1m_kr after removing
policies and before finishing) or, if keeping the table is intentional, add a
clear comment in downgrade() explaining that market_candles_1m_kr is preserved
on downgrade; reference the downgrade() function and the market_candles_1m_kr
table name when making the change.
In `@alembic/versions/d2f4a8c1b9e3_add_kr_quarantine_and_bigint_v2.py`:
- Around line 169-176: The downgrade SQL call using op.execute currently invokes
remove_continuous_aggregate_policy('market_candles_1h_kr_v2', if_not_exists =>
TRUE) which uses the wrong parameter name; update the SQL string in this
migration (and other occurrences) to call
remove_continuous_aggregate_policy('market_candles_1h_kr_v2', if_exists => TRUE)
so TimescaleDB 2.8+ recognizes the argument; locate the op.execute calls that
reference remove_continuous_aggregate_policy and replace any if_not_exists =>
TRUE with if_exists => TRUE.
In `@app/core/taskiq_broker.py`:
- Around line 15-16: The startup method calls ensure_timescale_ready()
unconditionally which makes TimescaleDB required for all broker processes;
update app/core/taskiq_broker.py so startup only invokes
ensure_timescale_ready() when KR is enabled and/or for processes that need
it—e.g., check settings.KR_OHLCV_DUAL_ROUTE_ENABLED (or a dedicated feature
flag) and the process type via is_worker_process/is_scheduler_process before
calling ensure_timescale_ready(); alternatively move this orchestration out of
the core module into a higher-level startup hook so non-KR brokers aren't
blocked.
In `@app/jobs/kr_ohlcv_precompute.py`:
- Around line 251-254: The call to record_fetch_success(route_market) is
executed regardless of whether any data was fetched, inflating the success
metric; update the logic in the loop that handles fetched data (the block
referencing merged, merged.empty, and route_market) so that
record_fetch_success(route_market) is only called when merged is not empty
(i.e., move the call below the if merged.empty: continue guard or add a
conditional that checks not merged.empty before invoking record_fetch_success).
Ensure you only change placement/guarding around record_fetch_success and keep
the existing continue behavior intact.
In `@app/services/kr_trading_calendar.py`:
- Around line 14-28: normalize_route currently rejects the string "NXT" because
_ROUTE_TO_EXCHANGE only contains keys "J" and "NX", while migration c9e4f5b8a2d1
treats "NXT" as a valid route; update _ROUTE_TO_EXCHANGE to include the mapping
"NXT": "NXT" so normalize_route("NXT") succeeds (or alternatively make the
migration stricter), ensuring the change is applied to the _ROUTE_TO_EXCHANGE
dict used by normalize_route.
In `@tests/test_kr_cagg_migration.py`:
- Line 22: Add the pytest unit marker to the test functions by importing pytest
if not present and decorating the target functions (e.g.,
test_cagg_sql_policy_recreated and the other test at line 76) with
`@pytest.mark.unit` so they are categorized as unit tests; ensure the decorator
uses the registered marker name "unit" and apply it immediately above each test
function definition.
In `@tests/test_kr_ohlcv_precompute.py`:
- Around line 109-113: The _DummySession.execute stub has the wrong signature
and will raise TypeError when callers pass params; update the method signature
of _DummySession.execute to accept params=None (e.g., async def execute(self,
_statement, params=None)) and ignore or forward params as needed so it matches
the real session contract used by _resolve_route and mirrors the pattern in
test_kr_ohlcv_timeseries_store.py; keep return of _ScalarResult(...) unchanged.
- Around line 12-45: Add the pytest marker `@pytest.mark.unit` above each test
function in this file (for example above
test_incremental_bootstraps_new_symbols_with_7_days) so the tests are
discoverable by filtered runs; place the decorator immediately before the async
def, e.g. add `@pytest.mark.unit` above
test_incremental_bootstraps_new_symbols_with_7_days and do the same for the
other test functions in tests/test_kr_ohlcv_precompute.py.
In `@tests/test_kr_ohlcv_timeseries_store.py`:
- Around line 85-267: All nine test functions lack a pytest marker; add
`@pytest.mark.unit` above each test definition
(test_ensure_timescale_ready_bypasses_in_test_env,
test_ensure_timescale_ready_fails_when_extension_missing,
test_ensure_timescale_ready_fails_when_minute_table_missing,
test_upsert_market_candles_1m_returns_zero_for_empty_frame,
test_upsert_market_candles_1m_invalid_exchange_to_quarantine,
test_frame_from_hour_rows_builds_dataframe,
test_fetch_market_candles_1h_kr_returns_empty_when_range_invalid,
test_fetch_previous_close_before_bucket_prefers_hour_cagg,
test_fetch_previous_close_before_bucket_uses_minute_fallback_tiebreak) so they
are discoverable by marker-filtered runs; ensure pytest is imported and place
the decorator directly above each async/def test function.
---
Duplicate comments:
In `@alembic/versions/c4d9e2a8f7b1_add_timescale_kr_ohlcv_tables.py`:
- Around line 104-131: The downgrade() block is using the wrong keyword for the
Timescale removal calls; change the SQL argument from if_not_exists => TRUE to
if_exists => TRUE for each remove call (specifically in the
remove_continuous_aggregate_policy('market_candles_1h_kr'),
remove_retention_policy('market_candles_1h_kr'), and
remove_retention_policy('market_candles_1m') statements) so the functions use
if_exists => TRUE instead of if_not_exists => TRUE.
---
Nitpick comments:
In `@alembic/versions/d2f4a8c1b9e3_add_kr_quarantine_and_bigint_v2.py`:
- Around line 62-103: The migration currently performs a single large INSERT ...
SELECT via op.execute which can hold a long transaction; change it to migrate in
batches by iterating over ranges (e.g., time windows or primary-key ranges) and
performing repeated smaller INSERT INTO market_candles_1m_kr_v2 ... SELECT FROM
market_candles_1m_kr operations (or use a PL/pgSQL loop with LIMIT/OFFSET)
committing between batches to avoid long locks and WAL bloat; update the
migration function that calls op.execute and implement a bounded-loop batching
strategy using the ts (or PK) column to select slices, ensure each batch uses
the same ROUND(...)::BIGINT transformations and ON CONFLICT handling, and test
for idempotence and resumeability.
In `@app/core/config.py`:
- Around line 49-52: Rename the new settings to follow the existing
lower_snake_case convention used by neighboring fields (e.g., change
KR_OHLCV_DUAL_ROUTE_ENABLED, KR_OHLCV_DUAL_ROUTE_CANARY_SYMBOLS,
KR_OHLCV_DUAL_ROUTE_CANARY_ALL, KR_OHLCV_V2_DUAL_WRITE_ENABLED to
kis_ohlcv_dual_route_enabled, kis_ohlcv_dual_route_canary_symbols,
kis_ohlcv_dual_route_canary_all, kis_ohlcv_v2_dual_write_enabled or another
consistent lower_snake_case prefix matching kis_/upbit_ names), update all
references in the codebase to the new symbols (search for the four uppercase
names), and add the corresponding environment variable keys and descriptions to
.env.example or the project config docs so runtime environment docs stay in sync
with app/core/config.py.
In `@app/core/taskiq_broker.py`:
- Line 9: The top-level import of ensure_timescale_ready from
app.services.kr_ohlcv_timeseries_store couples the core broker and can break
broker startup if that service or its deps are missing; remove the module-level
import and instead perform a lazy import inside the guarded startup path where
ensure_timescale_ready is invoked (e.g., inside the broker startup/ready hook or
the feature-flag branch), wrapping the import and call in try/except to log
failures and continue so broker processes don’t crash; reference
ensure_timescale_ready and the broker startup/ready function to locate where to
move the import.
In `@app/services/kr_ohlcv_metrics.py`:
- Line 10: The global Counter _counters is not thread-safe because operations
like _counters[key] += value are read-modify-write; protect all mutations by
adding a lock (e.g., create _counters_lock = threading.Lock() at module scope)
and wrap every increment/decrement/clear of _counters in with _counters_lock:
... to ensure atomic updates; if the module is used only in async
single-threaded contexts prefer an asyncio.Lock and await it around mutations
instead, but do not leave raw _counters[...] += ... unguarded.
In `@app/services/kr_ohlcv_timeseries_store.py`:
- Around line 44-91: Extract the repeated construction of the 7-column empty
DataFrame into a new private helper named _empty_minute_frame(), mirroring the
pattern used by _empty_hourly_frame; implement _empty_minute_frame() to return a
pd.DataFrame with columns
["datetime","open","high","low","close","volume","value"] and then replace the
three inline return pd.DataFrame(...) occurrences inside _normalize_minute_frame
with calls to _empty_minute_frame(); ensure _normalize_minute_frame still
imports/uses pd and that behavior is unchanged.
In `@app/tasks/kr_ohlcv_precompute_tasks.py`:
- Around line 14-51: The outer try/except in the task wrappers
run_kr_ohlcv_incremental_precompute_task and
run_kr_ohlcv_nightly_precompute_task is dead because
run_kr_ohlcv_incremental_precompute and run_kr_ohlcv_nightly_precompute already
catch exceptions and return failure dicts; either remove the outer try/except
blocks from the task functions to simplify them, or (preferred) remove the inner
broad except handlers inside run_kr_ohlcv_incremental_precompute and
run_kr_ohlcv_nightly_precompute so the task wrappers become the single error
boundary that logs exceptions and returns the failure dicts (update only those
functions named above to keep behavior consistent).
In `@tests/test_kr_cagg_migration.py`:
- Around line 44-48: The test uses next(...) without a default which raises
StopIteration on mismatch; update each usage (e.g., the generator that assigns
create_cagg_sql and the similar generators around lines where drop_cagg_sql and
the replacement cagg are found) to call next(<generator>, None) and then add an
explicit assert that the result is not None with a helpful message (e.g.,
"expected CREATE MATERIALIZED VIEW market_candles_1h_kr in statements") so the
test fails with a clear assertion error instead of StopIteration.
In `@tests/test_kr_ohlcv_precompute.py`:
- Around line 102-121: The three duplicated test helper classes (_ScalarResult,
_DummySession, _DummySessionManager) should be extracted into tests/conftest.py
as shared helpers/fixtures; create module-level helpers (or async fixtures) that
return the same behavior used here—_ScalarResult with scalar_one_or_none
returning the stored value, _DummySession.execute returning an instance of
_ScalarResult wrapping a Row-like object (with is_active and nxt_eligible), and
_DummySessionManager implementing async __aenter__/__aexit__ to yield
_DummySession—and update both tests (tests/test_kr_ohlcv_precompute.py and
tests/test_kr_ohlcv_timeseries_store.py) to import or use those fixtures instead
of redefining the classes.
In `@tests/test_kr_ohlcv_timeseries_store.py`:
- Around line 115-126: Add a unit test that covers the CAGG-missing branch in
ensure_timescale_ready: create a test similar to the existing ones that sets
settings.ENVIRONMENT to "production", uses _DummySession(values=["timescaledb",
"public.market_candles_1m_kr", None]) and monkeypatches
kr_ohlcv_timeseries_store.AsyncSessionLocal to return
_DummySessionManager(dummy), then assert that await
kr_ohlcv_timeseries_store.ensure_timescale_ready(allow_test_bypass=False) raises
RuntimeError with message "market_candles_1h_kr continuous aggregate is
missing". Ensure the test uses pytest.mark.asyncio (and pytest.mark.unit if you
want consistency).
| CREATE TABLE IF NOT EXISTS market_candles_1m ( | ||
| market TEXT NOT NULL, | ||
| symbol TEXT NOT NULL, | ||
| ts TIMESTAMPTZ NOT NULL, | ||
| open NUMERIC(20, 6) NOT NULL, | ||
| high NUMERIC(20, 6) NOT NULL, | ||
| low NUMERIC(20, 6) NOT NULL, | ||
| close NUMERIC(20, 6) NOT NULL, | ||
| volume BIGINT NOT NULL DEFAULT 0, | ||
| value BIGINT NOT NULL DEFAULT 0, | ||
| source TEXT NOT NULL DEFAULT 'kis', | ||
| route TEXT NOT NULL, | ||
| fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(), | ||
| updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), | ||
| PRIMARY KEY (market, symbol, ts) | ||
| ) | ||
| """ | ||
| ) |
There was a problem hiding this comment.
Primary key on (market, symbol, ts) silently drops multi-route data.
The route column is NOT NULL but excluded from the primary key. If KRX and NXT both report the same symbol at the same timestamp, only one row survives the PK constraint, losing data from one exchange. This is resolved in the subsequent migration (c9e4f5b8a2d1) which creates market_candles_1m_kr with (exchange, symbol, ts) PK, but be aware of this data-loss window if ingest is running before all migrations complete.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@alembic/versions/c4d9e2a8f7b1_add_timescale_kr_ohlcv_tables.py` around lines
14 - 31, The migration creates market_candles_1m with PRIMARY KEY (market,
symbol, ts) but omits the route column, causing rows from different
routes/exchanges with the same timestamp to be dropped; update the migration to
include route (or exchange) in the primary key (i.e., PRIMARY KEY (market,
symbol, route, ts)) or add a composite unique constraint that includes route so
multi-route data is preserved—modify the CREATE TABLE statement for
market_candles_1m in the migration (and any related index/constraint statements)
to reference the route column in the PK/unique key to avoid silent data loss.
| def downgrade() -> None: | ||
| op.execute( | ||
| """ | ||
| SELECT remove_continuous_aggregate_policy( | ||
| 'market_candles_1h_kr', | ||
| if_not_exists => TRUE | ||
| ) | ||
| """ | ||
| ) | ||
|
|
||
| op.execute("DROP MATERIALIZED VIEW IF EXISTS market_candles_1h_kr") | ||
|
|
||
| op.execute( | ||
| """ | ||
| CREATE MATERIALIZED VIEW market_candles_1h_kr | ||
| WITH (timescaledb.continuous) AS | ||
| SELECT | ||
| symbol, | ||
| time_bucket(INTERVAL '1 hour', ts, 'Asia/Seoul') AS bucket_start, | ||
| first(open, ts) AS open, | ||
| MAX(high) AS high, | ||
| MIN(low) AS low, | ||
| last(close, ts) AS close, | ||
| SUM(volume)::BIGINT AS volume, | ||
| SUM(value)::BIGINT AS value | ||
| FROM market_candles_1m_kr | ||
| WHERE exchange IN ('KRX', 'NXT') | ||
| GROUP BY symbol, bucket_start | ||
| WITH NO DATA | ||
| """ | ||
| ) | ||
|
|
||
| op.execute( | ||
| """ | ||
| ALTER MATERIALIZED VIEW market_candles_1h_kr | ||
| SET (timescaledb.materialized_only = false) | ||
| """ | ||
| ) | ||
|
|
||
| op.execute( | ||
| """ | ||
| CREATE UNIQUE INDEX IF NOT EXISTS ux_market_candles_1h_kr_symbol_bucket | ||
| ON market_candles_1h_kr (symbol, bucket_start) | ||
| """ | ||
| ) | ||
|
|
||
| op.execute( | ||
| """ | ||
| SELECT add_continuous_aggregate_policy( | ||
| 'market_candles_1h_kr', | ||
| start_offset => INTERVAL '8 days', | ||
| end_offset => INTERVAL '1 minute', | ||
| schedule_interval => INTERVAL '5 minutes', | ||
| if_not_exists => TRUE | ||
| ) | ||
| """ | ||
| ) |
There was a problem hiding this comment.
Downgrade does not drop market_candles_1m_kr table.
The upgrade creates market_candles_1m_kr (line 12) and populates it, but the downgrade only recreates the CAGG view — it never drops market_candles_1m_kr. If the intent of downgrade is to fully revert, the table should be dropped. If it's intentionally left (to preserve data), consider documenting that choice.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@alembic/versions/c9e4f5b8a2d1_rebuild_kr_hour_cagg_with_exchange_priority.py`
around lines 184 - 240, The downgrade() currently recreates the continuous
aggregate but does not remove the backing table created in upgrade
(market_candles_1m_kr); update downgrade() to either drop the table explicitly
(e.g., issue a DROP TABLE IF EXISTS for market_candles_1m_kr after removing
policies and before finishing) or, if keeping the table is intentional, add a
clear comment in downgrade() explaining that market_candles_1m_kr is preserved
on downgrade; reference the downgrade() function and the market_candles_1m_kr
table name when making the change.
| op.execute( | ||
| """ | ||
| SELECT remove_continuous_aggregate_policy( | ||
| 'market_candles_1h_kr_v2', | ||
| if_not_exists => TRUE | ||
| ) | ||
| """ | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
TimescaleDB remove_continuous_aggregate_policy if_exists parameter documentation
💡 Result:
TimescaleDB documents if_exists as an optional parameter to remove_continuous_aggregate_policy, with this signature:
remove_continuous_aggregate_policy(continuous_aggregate REGCLASS, if_exists BOOL = NULL) RETURNS VOID[1]
if_exists behavior (per docs):
- When
true: it prints a warning/notice instead of raising an error if the policy doesn’t exist (default is false) [1]. - It was renamed in TimescaleDB 2.8 from
if_not_existstoif_exists[1].
Example:
SELECT remove_continuous_aggregate_policy('cpu_view', if_exists => true);Sources: Timescale API reference page for remove_continuous_aggregate_policy() [1].
🏁 Script executed:
# Check the actual migration file content at lines 169-176
cat -n alembic/versions/d2f4a8c1b9e3_add_kr_quarantine_and_bigint_v2.py | sed -n '165,180p'Repository: mgh3326/auto_trader
Length of output: 598
🏁 Script executed:
# Also check the other migration files mentioned for the same pattern
fd "\.py$" alembic/versions/ | head -20 | while read f; do
if grep -l "remove_continuous_aggregate_policy" "$f" 2>/dev/null; then
echo "=== $f ==="
grep -n "if_not_exists\|if_exists" "$f" | head -5
fi
doneRepository: mgh3326/auto_trader
Length of output: 984
🏁 Script executed:
# Check for TimescaleDB version constraints in the codebase
fd -e txt -e md -e toml -e yml -e yaml | xargs grep -l "timescaledb\|TimescaleDB" 2>/dev/null | head -10Repository: mgh3326/auto_trader
Length of output: 45
🏁 Script executed:
# Check the context around remove_continuous_aggregate_policy calls to confirm they're for removal
grep -B 3 "remove_continuous_aggregate_policy" alembic/versions/d2f4a8c1b9e3_add_kr_quarantine_and_bigint_v2.py | head -20Repository: mgh3326/auto_trader
Length of output: 166
🏁 Script executed:
# Also check one of the other files for add_continuous_aggregate_policy to understand the pattern
grep -B 2 "add_continuous_aggregate_policy\|remove_continuous_aggregate_policy" alembic/versions/c4d9e2a8f7b1_add_timescale_kr_ohlcv_tables.py | head -30Repository: mgh3326/auto_trader
Length of output: 220
Use if_exists => TRUE instead of if_not_exists => TRUE for remove_continuous_aggregate_policy.
TimescaleDB 2.8+ uses if_exists as the parameter for remove_continuous_aggregate_policy. The current code uses if_not_exists, which is incorrect and will cause the downgrade to fail since the parameter is not recognized.
This issue affects multiple migration files (c4d9e2a8f7b1, c9e4f5b8a2d1, d2f4a8c1b9e3) with 13+ occurrences total.
Proposed fix for this file
op.execute(
"""
SELECT remove_continuous_aggregate_policy(
'market_candles_1h_kr_v2',
- if_not_exists => TRUE
+ if_exists => TRUE
)
"""
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| op.execute( | |
| """ | |
| SELECT remove_continuous_aggregate_policy( | |
| 'market_candles_1h_kr_v2', | |
| if_not_exists => TRUE | |
| ) | |
| """ | |
| ) | |
| op.execute( | |
| """ | |
| SELECT remove_continuous_aggregate_policy( | |
| 'market_candles_1h_kr_v2', | |
| if_exists => TRUE | |
| ) | |
| """ | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@alembic/versions/d2f4a8c1b9e3_add_kr_quarantine_and_bigint_v2.py` around
lines 169 - 176, The downgrade SQL call using op.execute currently invokes
remove_continuous_aggregate_policy('market_candles_1h_kr_v2', if_not_exists =>
TRUE) which uses the wrong parameter name; update the SQL string in this
migration (and other occurrences) to call
remove_continuous_aggregate_policy('market_candles_1h_kr_v2', if_exists => TRUE)
so TimescaleDB 2.8+ recognizes the argument; locate the op.execute calls that
reference remove_continuous_aggregate_policy and replace any if_not_exists =>
TRUE with if_exists => TRUE.
| async def startup(self) -> None: | ||
| await ensure_timescale_ready() |
There was a problem hiding this comment.
Unconditional ensure_timescale_ready() makes TimescaleDB a hard startup dependency for all broker processes.
This call runs before is_worker_process / is_scheduler_process branching, so every broker, worker, and scheduler process will fail to start if TimescaleDB isn't available or the KR tables haven't been migrated yet. This conflicts with the stated goal of an "ingest-only rollout" — existing non-KR workers would crash if deployed before the migrations run.
Consider gating this behind settings.KR_OHLCV_DUAL_ROUTE_ENABLED or a dedicated feature flag:
Proposed fix
async def startup(self) -> None:
- await ensure_timescale_ready()
+ if settings.KR_OHLCV_DUAL_ROUTE_ENABLED:
+ await ensure_timescale_ready()
if self.broker.is_worker_process:As per coding guidelines: "Do not add service-specific orchestration logic into core utility modules in app/core/."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/core/taskiq_broker.py` around lines 15 - 16, The startup method calls
ensure_timescale_ready() unconditionally which makes TimescaleDB required for
all broker processes; update app/core/taskiq_broker.py so startup only invokes
ensure_timescale_ready() when KR is enabled and/or for processes that need
it—e.g., check settings.KR_OHLCV_DUAL_ROUTE_ENABLED (or a dedicated feature
flag) and the process type via is_worker_process/is_scheduler_process before
calling ensure_timescale_ready(); alternatively move this orchestration out of
the core module into a higher-level startup hook so non-KR brokers aren't
blocked.
|
|
||
| record_fetch_success(route_market) | ||
| if merged.empty: | ||
| continue |
There was a problem hiding this comment.
record_fetch_success is called even when merged is empty (no data fetched).
The success metric is incremented unconditionally at line 252, before the if merged.empty: continue guard. An API call that returns no data in-session (e.g., a holiday or data gap) is still recorded as a fetch success, which inflates the counter.
🐛 Proposed fix
- record_fetch_success(route_market)
if merged.empty:
continue
+ record_fetch_success(route_market)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/jobs/kr_ohlcv_precompute.py` around lines 251 - 254, The call to
record_fetch_success(route_market) is executed regardless of whether any data
was fetched, inflating the success metric; update the logic in the loop that
handles fetched data (the block referencing merged, merged.empty, and
route_market) so that record_fetch_success(route_market) is only called when
merged is not empty (i.e., move the call below the if merged.empty: continue
guard or add a conditional that checks not merged.empty before invoking
record_fetch_success). Ensure you only change placement/guarding around
record_fetch_success and keep the existing continue behavior intact.
| _ROUTE_TO_EXCHANGE = { | ||
| "J": "KRX", | ||
| "NX": "NXT", | ||
| } | ||
| _EXCHANGE_TO_ROUTE = { | ||
| "KRX": "J", | ||
| "NXT": "NX", | ||
| } | ||
|
|
||
|
|
||
| def normalize_route(route: str) -> str: | ||
| normalized = str(route or "").strip().upper() | ||
| if normalized not in _ROUTE_TO_EXCHANGE: | ||
| raise ValueError(f"Unsupported KR route: {route}") | ||
| return normalized |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find the migration file c9e4f5b8a2d1
find . -type f -name "*c9e4f5b8a2d1*" -o -name "*migration*" | head -20Repository: mgh3326/auto_trader
Length of output: 383
🏁 Script executed:
# Search for the migration content and normalize_route usage
rg -n "c9e4f5b8a2d1|normalize_route" --type=py -C2Repository: mgh3326/auto_trader
Length of output: 2662
🏁 Script executed:
# Look for 'NXT' route usage in the codebase
rg -n "'NXT'|\"NXT\"" --type=py -C2Repository: mgh3326/auto_trader
Length of output: 7847
🏁 Script executed:
# Get full context of kr_trading_calendar.py
cat -n app/services/kr_trading_calendar.pyRepository: mgh3326/auto_trader
Length of output: 3224
normalize_route rejects 'NXT' but the migration accepts it as a valid route.
In migration c9e4f5b8a2d1, route values 'J', 'NX', and 'NXT' are all accepted (lines 50, 81, 96). Both 'NX' and 'NXT' are treated as equivalent routes mapping to the 'NXT' exchange. However, _ROUTE_TO_EXCHANGE only maps 'J' and 'NX', so calling normalize_route("NXT") raises ValueError. If upstream data carries 'NXT' as a route value, this breaks session-bound lookups.
Add 'NXT' → 'NXT' to _ROUTE_TO_EXCHANGE to align with the migration's route handling, or document and enforce that only 'J' and 'NX' are canonical route identifiers while the migration must be updated to reject 'NXT'.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/kr_trading_calendar.py` around lines 14 - 28, normalize_route
currently rejects the string "NXT" because _ROUTE_TO_EXCHANGE only contains keys
"J" and "NX", while migration c9e4f5b8a2d1 treats "NXT" as a valid route; update
_ROUTE_TO_EXCHANGE to include the mapping "NXT": "NXT" so normalize_route("NXT")
succeeds (or alternatively make the migration stricter), ensuring the change is
applied to the _ROUTE_TO_EXCHANGE dict used by normalize_route.
| return module | ||
|
|
||
|
|
||
| def test_cagg_sql_policy_recreated(monkeypatch): |
There was a problem hiding this comment.
Missing pytest markers on test functions.
Both test functions lack the required @pytest.mark.unit marker. Since these tests perform no I/O and only verify SQL string emission, they should be categorized as unit tests.
Proposed fix
+import pytest
+
+
+@pytest.mark.unit
def test_cagg_sql_policy_recreated(monkeypatch):+@pytest.mark.unit
def test_v2_migration_creates_bigint_tables_and_policy(monkeypatch):As per coding guidelines: "Use test markers (@pytest.mark.unit, @pytest.mark.integration, @pytest.mark.slow) to categorize tests appropriately" and "Use only registered pytest markers (slow, integration, unit) when strict markers are enabled."
Also applies to: 76-76
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_kr_cagg_migration.py` at line 22, Add the pytest unit marker to
the test functions by importing pytest if not present and decorating the target
functions (e.g., test_cagg_sql_policy_recreated and the other test at line 76)
with `@pytest.mark.unit` so they are categorized as unit tests; ensure the
decorator uses the registered marker name "unit" and apply it immediately above
each test function definition.
| @pytest.mark.asyncio | ||
| async def test_incremental_bootstraps_new_symbols_with_7_days(monkeypatch): | ||
| monkeypatch.setattr( | ||
| kr_ohlcv_precompute.kr_ohlcv_timeseries_store, | ||
| "ensure_timescale_ready", | ||
| AsyncMock(return_value=None), | ||
| ) | ||
| monkeypatch.setattr( | ||
| kr_ohlcv_precompute, | ||
| "_collect_kr_symbols", | ||
| AsyncMock(return_value={"005930", "000660"}), | ||
| ) | ||
|
|
||
| latest_bucket_mock = AsyncMock( | ||
| side_effect=[None, datetime.datetime(2026, 2, 19, 9, 0)] | ||
| ) | ||
| monkeypatch.setattr( | ||
| kr_ohlcv_precompute.kr_ohlcv_timeseries_store, | ||
| "fetch_latest_hourly_bucket", | ||
| latest_bucket_mock, | ||
| ) | ||
|
|
||
| sync_mock = AsyncMock(return_value={"status": "completed", "rows": 12}) | ||
| monkeypatch.setattr(kr_ohlcv_precompute, "_sync_symbol_minutes", sync_mock) | ||
|
|
||
| result = await kr_ohlcv_precompute.run_kr_ohlcv_incremental_precompute() | ||
|
|
||
| assert result["status"] == "completed" | ||
| assert result["mode"] == "incremental" | ||
| assert result["symbols"] == 2 | ||
| assert result["bootstrapped"] == 1 | ||
|
|
||
| called_days = [call.args[1] for call in sync_mock.await_args_list] | ||
| assert sorted(called_days) == [1, 7] |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add @pytest.mark.unit markers to all test functions.
Same issue as tests/test_kr_ohlcv_timeseries_store.py — none of the five tests carry a marker, making them invisible to filtered pytest runs.
As per coding guidelines: "Use test markers (@pytest.mark.unit, @pytest.mark.integration, @pytest.mark.slow) to categorize tests appropriately."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_kr_ohlcv_precompute.py` around lines 12 - 45, Add the pytest
marker `@pytest.mark.unit` above each test function in this file (for example
above test_incremental_bootstraps_new_symbols_with_7_days) so the tests are
discoverable by filtered runs; place the decorator immediately before the async
def, e.g. add `@pytest.mark.unit` above
test_incremental_bootstraps_new_symbols_with_7_days and do the same for the
other test functions in tests/test_kr_ohlcv_precompute.py.
| class _DummySession: | ||
| async def execute(self, _statement): | ||
| return _ScalarResult( | ||
| type("Row", (), {"is_active": True, "nxt_eligible": True}) | ||
| ) |
There was a problem hiding this comment.
_DummySession.execute is missing the params=None parameter.
async def execute(self, _statement): # ← no params argumentIf _resolve_route (or any future call site) invokes session.execute(stmt, params) with a positional or keyword params argument, this will raise TypeError. The pattern used in test_kr_ohlcv_timeseries_store.py correctly includes params=None.
🐛 Proposed fix
- async def execute(self, _statement):
+ async def execute(self, _statement, params=None):
+ del params🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_kr_ohlcv_precompute.py` around lines 109 - 113, The
_DummySession.execute stub has the wrong signature and will raise TypeError when
callers pass params; update the method signature of _DummySession.execute to
accept params=None (e.g., async def execute(self, _statement, params=None)) and
ignore or forward params as needed so it matches the real session contract used
by _resolve_route and mirrors the pattern in test_kr_ohlcv_timeseries_store.py;
keep return of _ScalarResult(...) unchanged.
| @pytest.mark.asyncio | ||
| async def test_ensure_timescale_ready_bypasses_in_test_env(monkeypatch): | ||
| monkeypatch.setattr(settings, "ENVIRONMENT", "test", raising=False) | ||
|
|
||
| def _should_not_open_session(): | ||
| raise AssertionError("AsyncSessionLocal should not be called in test env") | ||
|
|
||
| monkeypatch.setattr( | ||
| kr_ohlcv_timeseries_store, | ||
| "AsyncSessionLocal", | ||
| _should_not_open_session, | ||
| ) | ||
|
|
||
| await kr_ohlcv_timeseries_store.ensure_timescale_ready() | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_ensure_timescale_ready_fails_when_extension_missing(monkeypatch): | ||
| monkeypatch.setattr(settings, "ENVIRONMENT", "production", raising=False) | ||
| dummy = _DummySession(values=[None]) | ||
| monkeypatch.setattr( | ||
| kr_ohlcv_timeseries_store, | ||
| "AsyncSessionLocal", | ||
| lambda: _DummySessionManager(dummy), | ||
| ) | ||
|
|
||
| with pytest.raises(RuntimeError, match="TimescaleDB extension is not installed"): | ||
| await kr_ohlcv_timeseries_store.ensure_timescale_ready(allow_test_bypass=False) | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_ensure_timescale_ready_fails_when_minute_table_missing(monkeypatch): | ||
| monkeypatch.setattr(settings, "ENVIRONMENT", "production", raising=False) | ||
| dummy = _DummySession(values=["timescaledb", None]) | ||
| monkeypatch.setattr( | ||
| kr_ohlcv_timeseries_store, | ||
| "AsyncSessionLocal", | ||
| lambda: _DummySessionManager(dummy), | ||
| ) | ||
|
|
||
| with pytest.raises(RuntimeError, match="market_candles_1m_kr table is missing"): | ||
| await kr_ohlcv_timeseries_store.ensure_timescale_ready(allow_test_bypass=False) | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_upsert_market_candles_1m_returns_zero_for_empty_frame(): | ||
| result = await kr_ohlcv_timeseries_store.upsert_market_candles_1m( | ||
| symbol="005930", | ||
| exchange="KRX", | ||
| route="J", | ||
| frame=pd.DataFrame(), | ||
| ) | ||
|
|
||
| assert result == { | ||
| "rows": 0, | ||
| "min_ts": None, | ||
| "max_ts": None, | ||
| } | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_upsert_market_candles_1m_invalid_exchange_to_quarantine(monkeypatch): | ||
| session = _CaptureWriteSession() | ||
| monkeypatch.setattr( | ||
| kr_ohlcv_timeseries_store, | ||
| "AsyncSessionLocal", | ||
| lambda: _DummySessionManager(session), | ||
| ) | ||
|
|
||
| frame = pd.DataFrame( | ||
| [ | ||
| { | ||
| "datetime": pd.Timestamp("2026-02-19 09:01:00"), | ||
| "open": 100.0, | ||
| "high": 101.0, | ||
| "low": 99.0, | ||
| "close": 100.5, | ||
| "volume": 100, | ||
| "value": 10050, | ||
| } | ||
| ] | ||
| ) | ||
|
|
||
| result = await kr_ohlcv_timeseries_store.upsert_market_candles_1m( | ||
| symbol="005930", | ||
| exchange="BADX", | ||
| route="J", | ||
| frame=frame, | ||
| ) | ||
|
|
||
| assert result["rows"] == 0 | ||
| assert result["quarantined_rows"] == 1 | ||
| assert any( | ||
| "INSERT INTO market_candles_ingest_quarantine" in stmt | ||
| for stmt in session.statements | ||
| ) | ||
|
|
||
|
|
||
| def test_frame_from_hour_rows_builds_dataframe(): | ||
| frame = kr_ohlcv_timeseries_store.frame_from_hour_rows( | ||
| [ | ||
| { | ||
| "datetime": pd.Timestamp("2026-02-19 09:00:00"), | ||
| "date": pd.Timestamp("2026-02-19").date(), | ||
| "time": pd.Timestamp("2026-02-19 09:00:00").time(), | ||
| "open": 100.0, | ||
| "high": 101.0, | ||
| "low": 99.0, | ||
| "close": 100.5, | ||
| "volume": 100, | ||
| "value": 10050, | ||
| } | ||
| ] | ||
| ) | ||
|
|
||
| assert len(frame) == 1 | ||
| assert set(frame.columns) == { | ||
| "datetime", | ||
| "date", | ||
| "time", | ||
| "open", | ||
| "high", | ||
| "low", | ||
| "close", | ||
| "volume", | ||
| "value", | ||
| } | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_fetch_market_candles_1h_kr_returns_empty_when_range_invalid(): | ||
| start_bucket = pd.Timestamp("2026-02-19 12:00:00").to_pydatetime() | ||
| end_bucket = pd.Timestamp("2026-02-19 10:00:00").to_pydatetime() | ||
|
|
||
| frame = await kr_ohlcv_timeseries_store.fetch_market_candles_1h_kr( | ||
| symbol="005930", | ||
| start_bucket=start_bucket, | ||
| end_bucket=end_bucket, | ||
| ) | ||
|
|
||
| assert frame.empty | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_fetch_previous_close_before_bucket_prefers_hour_cagg(monkeypatch): | ||
| session = _RecordingSession(cagg_value=101.25, minute_value=88.0) | ||
| monkeypatch.setattr( | ||
| kr_ohlcv_timeseries_store, | ||
| "AsyncSessionLocal", | ||
| lambda: _DummySessionManager(session), | ||
| ) | ||
|
|
||
| close = await kr_ohlcv_timeseries_store.fetch_previous_close_before_bucket( | ||
| symbol="005930", | ||
| before_bucket=datetime.datetime(2026, 2, 19, 10, 0), | ||
| ) | ||
|
|
||
| assert close == 101.25 | ||
| assert any("FROM market_candles_1h_kr" in sql for sql in session.statements) | ||
| assert not any("FROM market_candles_1m_kr" in sql for sql in session.statements) | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_fetch_previous_close_before_bucket_uses_minute_fallback_tiebreak( | ||
| monkeypatch, | ||
| ): | ||
| session = _RecordingSession(cagg_value=None, minute_value=77.7) | ||
| monkeypatch.setattr( | ||
| kr_ohlcv_timeseries_store, | ||
| "AsyncSessionLocal", | ||
| lambda: _DummySessionManager(session), | ||
| ) | ||
|
|
||
| close = await kr_ohlcv_timeseries_store.fetch_previous_close_before_bucket( | ||
| symbol="005930", | ||
| before_bucket=datetime.datetime(2026, 2, 19, 10, 0), | ||
| ) | ||
|
|
||
| assert close == 77.7 | ||
| minute_sql = next( | ||
| sql for sql in session.statements if "FROM market_candles_1m_kr" in sql | ||
| ) | ||
| assert "CASE WHEN exchange = 'KRX' THEN 1 ELSE 0 END DESC" in minute_sql |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add @pytest.mark.unit markers to all tests.
None of the 9 test functions carry a pytest marker. With strict marker enforcement enabled, these tests are invisible to filtered runs (pytest -m unit) and violate the test categorization guideline.
🏷️ Example fix (applies to all test functions)
+@pytest.mark.unit
`@pytest.mark.asyncio`
async def test_ensure_timescale_ready_bypasses_in_test_env(monkeypatch):
...
+@pytest.mark.unit
`@pytest.mark.asyncio`
async def test_ensure_timescale_ready_fails_when_extension_missing(monkeypatch):
...
+@pytest.mark.unit
def test_frame_from_hour_rows_builds_dataframe():
...
# (apply to all remaining test functions)As per coding guidelines: "Use test markers (@pytest.mark.unit, @pytest.mark.integration, @pytest.mark.slow) to categorize tests appropriately."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_kr_ohlcv_timeseries_store.py` around lines 85 - 267, All nine test
functions lack a pytest marker; add `@pytest.mark.unit` above each test definition
(test_ensure_timescale_ready_bypasses_in_test_env,
test_ensure_timescale_ready_fails_when_extension_missing,
test_ensure_timescale_ready_fails_when_minute_table_missing,
test_upsert_market_candles_1m_returns_zero_for_empty_frame,
test_upsert_market_candles_1m_invalid_exchange_to_quarantine,
test_frame_from_hour_rows_builds_dataframe,
test_fetch_market_candles_1h_kr_returns_empty_when_range_invalid,
test_fetch_previous_close_before_bucket_prefers_hour_cagg,
test_fetch_previous_close_before_bucket_uses_minute_fallback_tiebreak) so they
are discoverable by marker-filtered runs; ensure pytest is imported and place
the decorator directly above each async/def test function.




Summary
Test Plan
uv run pytest --no-cov tests/test_kr_cagg_migration.py tests/test_kr_ohlcv_precompute.py tests/test_kr_ohlcv_timeseries_store.py -quv run pytest --no-cov tests/test_mcp_server_tools.py::test_get_quote_korean_equity tests/test_routers.py::TestHealthRouter::test_health_check -qSummary by CodeRabbit
New Features
Chores