Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion python/api/poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ async def process(self, input: dict, request: Request) -> dict | Response:
"last_result": task_details.get("last_result"),
"attachments": task_details.get("attachments", []),
"context_id": task_details.get("context_id"),
"project_name": task_details.get("project_name"),
"project_color": task_details.get("project_color"),
"project": {
"name": task_details.get("project_name"),
"color": task_details.get("project_color"),
},
})

# Add type-specific fields
Expand Down Expand Up @@ -119,4 +125,4 @@ async def process(self, input: dict, request: Request) -> dict | Response:
"notifications": notifications,
"notifications_guid": notification_manager.guid,
"notifications_version": len(notification_manager.updates),
}
}
37 changes: 32 additions & 5 deletions python/api/scheduler_task_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
TaskScheduler, ScheduledTask, AdHocTask, PlannedTask, TaskSchedule,
serialize_task, parse_task_schedule, parse_task_plan, TaskType
)
from python.helpers.projects import load_basic_project_data
from python.helpers.localization import Localization
from python.helpers.print_style import PrintStyle
import random
Expand All @@ -27,7 +28,26 @@ async def process(self, input: Input, request: Request) -> Output:
system_prompt = input.get("system_prompt", "")
prompt = input.get("prompt")
attachments = input.get("attachments", [])
context_id = input.get("context_id", None)

requested_project_slug = input.get("project_name")
if isinstance(requested_project_slug, str):
requested_project_slug = requested_project_slug.strip() or None
else:
requested_project_slug = None

project_slug = requested_project_slug
project_color = None

if project_slug:
try:
metadata = load_basic_project_data(requested_project_slug)
project_color = metadata.get("color") or None
except Exception as exc:
printer.error(f"SchedulerTaskCreate: failed to load project '{project_slug}': {exc}")
return {"error": f"Saving project failed: {project_slug}"}

# Always dedicated context for scheduler tasks created by ui
task_context_id = None

# Check if schedule is provided (for ScheduledTask)
schedule = input.get("schedule", {})
Expand Down Expand Up @@ -77,8 +97,10 @@ async def process(self, input: Input, request: Request) -> Output:
prompt=prompt,
schedule=task_schedule,
attachments=attachments,
context_id=context_id,
timezone=timezone
context_id=task_context_id,
timezone=timezone,
project_name=project_slug,
project_color=project_color,
)
elif plan:
# Create a planned task
Expand All @@ -94,7 +116,9 @@ async def process(self, input: Input, request: Request) -> Output:
prompt=prompt,
plan=task_plan,
attachments=attachments,
context_id=context_id
context_id=task_context_id,
project_name=project_slug,
project_color=project_color,
)
else:
# Create an ad-hoc task
Expand All @@ -105,7 +129,9 @@ async def process(self, input: Input, request: Request) -> Output:
prompt=prompt,
token=token,
attachments=attachments,
context_id=context_id
context_id=task_context_id,
project_name=project_slug,
project_color=project_color,
)
# Verify token after creation
if isinstance(task, AdHocTask):
Expand All @@ -132,5 +158,6 @@ async def process(self, input: Input, request: Request) -> Output:
printer.print(f"Serialized adhoc task, token in response: '{task_dict.get('token')}'")

return {
"ok": True,
"task": task_dict
}
4 changes: 4 additions & 0 deletions python/api/scheduler_task_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ async def process(self, input: Input, request: Request) -> Output:
if "attachments" in input:
update_params["attachments"] = input.get("attachments", [])

if "project_name" in input or "project_color" in input:
return {"error": "Project changes are not allowed"}

# Update schedule if this is a scheduled task and schedule is provided
if isinstance(task, ScheduledTask) and "schedule" in input:
schedule_data = input.get("schedule", {})
Expand Down Expand Up @@ -85,5 +88,6 @@ async def process(self, input: Input, request: Request) -> Output:
task_dict = serialize_task(updated_task)

return {
"ok": True,
"task": task_dict
}
4 changes: 2 additions & 2 deletions python/api/scheduler_tasks_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ async def process(self, input: Input, request: Request) -> Output:
# Use the scheduler's convenience method for task serialization
tasks_list = scheduler.serialize_all_tasks()

return {"tasks": tasks_list}
return {"ok": True, "tasks": tasks_list}

except Exception as e:
PrintStyle.error(f"Failed to list tasks: {str(e)} {traceback.format_exc()}")
return {"error": f"Failed to list tasks: {str(e)} {traceback.format_exc()}", "tasks": []}
return {"ok": False, "error": f"Failed to list tasks: {str(e)} {traceback.format_exc()}", "tasks": []}
42 changes: 34 additions & 8 deletions python/helpers/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class BaseTask(BaseModel):
system_prompt: str
prompt: str
attachments: list[str] = Field(default_factory=list)
project_name: str | None = Field(default=None)
project_color: str | None = Field(default=None)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
last_run: datetime | None = None
Expand Down Expand Up @@ -181,6 +183,9 @@ def check_schedule(self, frequency_seconds: float = 60.0) -> bool:
def get_next_run(self) -> datetime | None:
return None

def is_dedicated(self) -> bool:
return self.context_id == self.uuid

def get_next_run_minutes(self) -> int | None:
next_run = self.get_next_run()
if next_run is None:
Expand Down Expand Up @@ -243,14 +248,18 @@ def create(
prompt: str,
token: str,
attachments: list[str] = list(),
context_id: str | None = None
context_id: str | None = None,
project_name: str | None = None,
project_color: str | None = None
):
return cls(name=name,
system_prompt=system_prompt,
prompt=prompt,
attachments=attachments,
token=token,
context_id=context_id)
context_id=context_id,
project_name=project_name,
project_color=project_color)

def update(self,
name: str | None = None,
Expand Down Expand Up @@ -288,7 +297,9 @@ def create(
schedule: TaskSchedule,
attachments: list[str] = list(),
context_id: str | None = None,
timezone: str | None = None
timezone: str | None = None,
project_name: str | None = None,
project_color: str | None = None,
):
# Set timezone in schedule if provided
if timezone is not None:
Expand All @@ -301,7 +312,9 @@ def create(
prompt=prompt,
attachments=attachments,
schedule=schedule,
context_id=context_id)
context_id=context_id,
project_name=project_name,
project_color=project_color)

def update(self,
name: str | None = None,
Expand Down Expand Up @@ -365,14 +378,18 @@ def create(
prompt: str,
plan: TaskPlan,
attachments: list[str] = list(),
context_id: str | None = None
context_id: str | None = None,
project_name: str | None = None,
project_color: str | None = None
):
return cls(name=name,
system_prompt=system_prompt,
prompt=prompt,
plan=plan,
attachments=attachments,
context_id=context_id)
context_id=context_id,
project_name=project_name,
project_color=project_color)

def update(self,
name: str | None = None,
Expand Down Expand Up @@ -1037,12 +1054,19 @@ def serialize_task(task: Union[ScheduledTask, AdHocTask, PlannedTask]) -> Dict[s
"system_prompt": task.system_prompt,
"prompt": task.prompt,
"attachments": task.attachments,
"project_name": task.project_name,
"project_color": task.project_color,
"created_at": serialize_datetime(task.created_at),
"updated_at": serialize_datetime(task.updated_at),
"last_run": serialize_datetime(task.last_run),
"next_run": serialize_datetime(task.get_next_run()),
"last_result": task.last_result,
"context_id": task.context_id
"context_id": task.context_id,
"dedicated_context": task.is_dedicated(),
"project": {
"name": task.project_name,
"color": task.project_color,
},
}

# Add type-specific fields
Expand Down Expand Up @@ -1102,11 +1126,13 @@ def deserialize_task(task_data: Dict[str, Any], task_class: Optional[Type[T]] =
"system_prompt": task_data.get("system_prompt", ""),
"prompt": task_data.get("prompt", ""),
"attachments": task_data.get("attachments", []),
"project_name": task_data.get("project_name"),
"project_color": task_data.get("project_color"),
"created_at": parse_datetime(task_data.get("created_at")),
"updated_at": parse_datetime(task_data.get("updated_at")),
"last_run": parse_datetime(task_data.get("last_run")),
"last_result": task_data.get("last_result"),
"context_id": task_data.get("context_id")
"context_id": task_data.get("context_id"),
}

# Add type-specific fields
Expand Down
35 changes: 31 additions & 4 deletions python/tools/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from agent import AgentContext
from python.helpers import persist_chat
from python.helpers.projects import get_context_project_name, load_basic_project_data

DEFAULT_WAIT_TIMEOUT = 300

Expand Down Expand Up @@ -38,6 +39,20 @@ async def execute(self, **kwargs):
else:
return Response(message=f"Unknown method '{self.name}:{self.method}'", break_loop=False)

def _resolve_project_metadata(self) -> tuple[str | None, str | None]:
context = self.agent.context
if not context:
return (None, None)
project_slug = get_context_project_name(context)
if not project_slug:
return (None, None)
try:
metadata = load_basic_project_data(project_slug)
color = metadata.get("color") or None
except Exception:
color = None
return project_slug, color

async def list_tasks(self, **kwargs) -> Response:
state_filter: list[str] | None = kwargs.get("state", None)
type_filter: list[str] | None = kwargs.get("type", None)
Expand Down Expand Up @@ -153,13 +168,17 @@ async def create_scheduled_task(self, **kwargs) -> Response:
if not re.match(cron_regex, task_schedule.to_crontab()):
return Response(message="Invalid cron expression: " + task_schedule.to_crontab(), break_loop=False)

project_slug, project_color = self._resolve_project_metadata()

task = ScheduledTask.create(
name=name,
system_prompt=system_prompt,
prompt=prompt,
attachments=attachments,
schedule=task_schedule,
context_id=None if dedicated_context else self.agent.context.id
context_id=None if dedicated_context else self.agent.context.id,
project_name=project_slug,
project_color=project_color,
)
await TaskScheduler.get().add_task(task)
return Response(message=f"Scheduled task '{name}' created: {task.uuid}", break_loop=False)
Expand All @@ -172,13 +191,17 @@ async def create_adhoc_task(self, **kwargs) -> Response:
token: str = str(random.randint(1000000000000000000, 9999999999999999999))
dedicated_context: bool = kwargs.get("dedicated_context", False)

project_slug, project_color = self._resolve_project_metadata()

task = AdHocTask.create(
name=name,
system_prompt=system_prompt,
prompt=prompt,
attachments=attachments,
token=token,
context_id=None if dedicated_context else self.agent.context.id
context_id=None if dedicated_context else self.agent.context.id,
project_name=project_slug,
project_color=project_color,
)
await TaskScheduler.get().add_task(task)
return Response(message=f"Adhoc task '{name}' created: {task.uuid}", break_loop=False)
Expand Down Expand Up @@ -206,14 +229,18 @@ async def create_planned_task(self, **kwargs) -> Response:
done=[]
)

project_slug, project_color = self._resolve_project_metadata()

# Create planned task with task plan
task = PlannedTask.create(
name=name,
system_prompt=system_prompt,
prompt=prompt,
attachments=attachments,
plan=task_plan,
context_id=None if dedicated_context else self.agent.context.id
context_id=None if dedicated_context else self.agent.context.id,
project_name=project_slug,
project_color=project_color
)
await TaskScheduler.get().add_task(task)
return Response(message=f"Planned task '{name}' created: {task.uuid}", break_loop=False)
Expand All @@ -229,7 +256,7 @@ async def wait_for_task(self, **kwargs) -> Response:
return Response(message=f"Task not found: {task_uuid}", break_loop=False)

if task.context_id == self.agent.context.id:
return Response(message="You can only wait for tasks running in a different chat context (dedicated_context=True).", break_loop=False)
return Response(message="You can only wait for tasks running in their own dedicated context.", break_loop=False)

done = False
elapsed = 0
Expand Down
Loading