Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions src/biz_layer/mem_memorize.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@
from infra_layer.adapters.out.search.repository.episodic_memory_es_repository import (
EpisodicMemoryEsRepository,
)
from infra_layer.adapters.out.search.repository.event_log_milvus_repository import (
EventLogMilvusRepository,
)
from infra_layer.adapters.out.search.repository.foresight_es_repository import (
ForesightEsRepository,
)
from infra_layer.adapters.out.search.repository.foresight_milvus_repository import (
ForesightMilvusRepository,
)
from biz_layer.mem_sync import MemorySyncService
from core.context.context import get_current_app_info

Expand Down Expand Up @@ -1119,6 +1128,28 @@ async def save_memory_docs(
episodic_repo = get_bean_by_type(EpisodicMemoryRawRepository)
episodic_es_repo = get_bean_by_type(EpisodicMemoryEsRepository)
episodic_milvus_repo = get_bean_by_type(EpisodicMemoryMilvusRepository)

# Dedup: delete existing records with the same parent_id before insert
parent_ids_seen: set = set()
for doc in episodic_docs:
pid = getattr(doc, "parent_id", None)
if pid and pid not in parent_ids_seen:
parent_ids_seen.add(pid)
try:
await episodic_repo.delete_by_parent_id(pid)
await episodic_es_repo.delete_by_filters(
filters={"parent_id": pid}
)
await episodic_milvus_repo.delete_by_filters(
filters={"parent_id": pid}
)
except Exception as e:
logger.warning(
"[Dedup] Failed to delete old episodic records for parent_id=%s: %s",
pid,
e,
)

saved_episodic: List[Any] = []

for doc in episodic_docs:
Expand Down Expand Up @@ -1158,6 +1189,24 @@ async def save_memory_docs(
event_log_docs = grouped_docs.get(MemoryType.EVENT_LOG, [])
if event_log_docs:
event_log_repo = get_bean_by_type(EventLogRecordRawRepository)
event_log_milvus_repo = get_bean_by_type(EventLogMilvusRepository)

# Dedup: delete existing event_log records with the same parent_id
el_parent_ids_seen: set = set()
for doc in event_log_docs:
pid = getattr(doc, "parent_id", None)
if pid and pid not in el_parent_ids_seen:
el_parent_ids_seen.add(pid)
try:
await event_log_repo.delete_by_parent_id(pid)
await event_log_milvus_repo.delete_by_parent_id(pid)
except Exception as e:
logger.warning(
"[Dedup] Failed to delete old event_log records for parent_id=%s: %s",
pid,
e,
)

saved_event_logs = await event_log_repo.create_batch(event_log_docs)
saved_result[MemoryType.EVENT_LOG] = saved_event_logs

Expand Down Expand Up @@ -1410,3 +1459,93 @@ async def memorize(request: MemorizeRequest) -> int:
logger.error(f"[mem_memorize] ❌ Memory extraction failed: {e}")
traceback.print_exc()
return 0


async def cleanup_expired_foresights() -> int:
"""
Remove expired foresight records from all stores (MongoDB, Elasticsearch, Milvus).

ForesightRecord has a validity window defined by ``start_time`` / ``end_time``
(date strings in YYYY-MM-DD format). Once ``end_time`` is in the past the
record is no longer useful, but the current pipeline never deletes it. This
helper performs the housekeeping.

Returns:
Total number of expired foresight records deleted from MongoDB.
"""
from infra_layer.adapters.out.persistence.document.memory.foresight_record import (
ForesightRecord,
)

today_str = to_date_str(get_now_with_timezone())
logger.info(
"[ForesightCleanup] Starting expired foresight cleanup (today=%s)", today_str
)

# 1. Query expired records from MongoDB
try:
expired_records: List[Any] = await ForesightRecord.find(
{
"end_time": {"$lt": today_str, "$ne": None},
}
).to_list()
except Exception as e:
logger.error("[ForesightCleanup] Failed to query expired foresights: %s", e)
return 0

if not expired_records:
logger.info("[ForesightCleanup] No expired foresight records found")
return 0

logger.info(
"[ForesightCleanup] Found %d expired foresight records to delete",
len(expired_records),
)

# 2. Delete from search stores (best-effort)
foresight_es_repo = get_bean_by_type(ForesightEsRepository)
foresight_milvus_repo = get_bean_by_type(ForesightMilvusRepository)

for record in expired_records:
record_id = str(record.id) if record.id else None
if not record_id:
continue
try:
await foresight_milvus_repo.delete_by_id(record_id)
except Exception as e:
logger.warning(
"[ForesightCleanup] Failed to delete from Milvus id=%s: %s",
record_id,
e,
)
try:
await foresight_es_repo.delete_by_filters(filters={"_id": record_id})
except Exception as e:
logger.warning(
"[ForesightCleanup] Failed to delete from ES id=%s: %s",
record_id,
e,
)

# 3. Delete from MongoDB
deleted_count = 0
try:
foresight_repo = get_bean_by_type(ForesightRecordRawRepository)
for record in expired_records:
record_id = str(record.id) if record.id else None
if record_id:
result = await foresight_repo.delete_by_id(record_id)
if result:
deleted_count += 1
except Exception as e:
logger.error(
"[ForesightCleanup] Failed to delete expired foresights from MongoDB: %s",
e,
)

logger.info(
"[ForesightCleanup] ✅ Cleanup complete: deleted %d/%d expired foresight records",
deleted_count,
len(expired_records),
)
return deleted_count
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,42 @@ async def delete_by_user_id(
logger.error("❌ Failed to delete episodic memories by user ID: %s", e)
return 0

async def delete_by_parent_id(
self, parent_id: str, session: Optional[AsyncClientSession] = None
) -> int:
"""
Delete all episodic memories by parent ID (e.g. memcell event_id).

Used for deduplication: when re-processing the same source, delete old
records before inserting new ones.

Args:
parent_id: Parent memory ID
session: Optional MongoDB session for transaction support

Returns:
Number of deleted records
"""
try:
result = await self.model.find(
{"parent_id": parent_id}
).delete(session=session)
count = result.deleted_count if result else 0
if count > 0:
logger.info(
"✅ Deleted %d episodic memories by parent_id=%s",
count,
parent_id,
)
return count
except Exception as e:
logger.error(
"❌ Failed to delete episodic memories by parent_id=%s: %s",
parent_id,
e,
)
return 0

async def find_by_filter_paginated(
self,
query_filter: Optional[Dict[str, Any]] = None,
Expand Down
Loading