Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions decart/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import aiohttp
from pydantic import ValidationError
from .errors import InvalidAPIKeyError, InvalidBaseURLError, InvalidInputError
from .models import ModelDefinition
from .models import ImageModelDefinition, _MODELS
from .process.request import send_request
from .queue.client import QueueClient

Expand All @@ -27,7 +27,15 @@ class DecartClient:
Example:
```python
client = DecartClient(api_key="your-key")
result = await client.process({

# Image generation (sync) - use process()
image = await client.process({
"model": models.image("lucy-pro-t2i"),
"prompt": "A serene lake at sunset",
})

# Video generation (async) - use queue
result = await client.queue.submit_and_poll({
"model": models.video("lucy-pro-t2v"),
"prompt": "A serene lake at sunset",
})
Expand Down Expand Up @@ -55,7 +63,8 @@ def __init__(
@property
def queue(self) -> QueueClient:
"""
Queue client for async job-based video and image generation.
Queue client for async job-based video generation.
Only video models support the queue API.

Example:
```python
Expand Down Expand Up @@ -97,22 +106,38 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):

async def process(self, options: dict[str, Any]) -> bytes:
"""
Process video or image generation/transformation.
Process image generation/transformation synchronously.
Only image models support the process API.

For video generation, use the queue API instead:
result = await client.queue.submit_and_poll({...})

Args:
options: Processing options including model and inputs
- model: ImageModelDefinition from models.image()
- prompt: Text prompt for generation
- Additional model-specific inputs

Returns:
Generated/transformed media as bytes
Generated/transformed image as bytes

Raises:
InvalidInputError: If inputs are invalid
InvalidInputError: If inputs are invalid or model is not an image model
ProcessingError: If processing fails
"""
if "model" not in options:
raise InvalidInputError("model is required")

model: ModelDefinition = options["model"]
model: ImageModelDefinition = options["model"]

# Validate that this is an image model (check against registry)
if model.name not in _MODELS["image"]:
raise InvalidInputError(
f"Model '{model.name}' is not supported by process(). "
f"Only image models support sync processing. "
f"For video models, use client.queue.submit_and_poll() instead."
)

cancel_token = options.get("cancel_token")

inputs = {k: v for k, v in options.items() if k not in ("model", "cancel_token")}
Expand Down
54 changes: 45 additions & 9 deletions decart/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Literal, Optional, List
from typing import Literal, Optional, List, Generic, TypeVar
from pydantic import BaseModel, Field, ConfigDict
from .errors import ModelNotFoundError
from .types import FileInput, MotionTrajectoryInput
Expand All @@ -17,20 +17,34 @@
ImageModels = Literal["lucy-pro-t2i", "lucy-pro-i2i"]
Model = Literal[RealTimeModels, VideoModels, ImageModels]

# Type variable for model name
ModelT = TypeVar("ModelT", bound=str)


class DecartBaseModel(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)


class ModelDefinition(DecartBaseModel):
name: str
class ModelDefinition(DecartBaseModel, Generic[ModelT]):
name: ModelT
url_path: str
fps: int = Field(ge=1)
width: int = Field(ge=1)
height: int = Field(ge=1)
input_schema: type[BaseModel]


# Type aliases for model definitions that support specific APIs
ImageModelDefinition = ModelDefinition[ImageModels]
"""Type alias for model definitions that support synchronous processing (process API)."""

VideoModelDefinition = ModelDefinition[VideoModels]
"""Type alias for model definitions that support queue processing (queue API)."""

RealTimeModelDefinition = ModelDefinition[RealTimeModels]
"""Type alias for model definitions that support realtime streaming."""


class TextToVideoInput(BaseModel):
prompt: str = Field(..., min_length=1, max_length=1000)
seed: Optional[int] = None
Expand Down Expand Up @@ -212,23 +226,45 @@ class ImageToImageInput(DecartBaseModel):

class Models:
@staticmethod
def realtime(model: RealTimeModels) -> ModelDefinition:
def realtime(model: RealTimeModels) -> RealTimeModelDefinition:
"""Get a realtime model definition for WebRTC streaming."""
try:
return _MODELS["realtime"][model]
return _MODELS["realtime"][model] # type: ignore[return-value]
except KeyError:
raise ModelNotFoundError(model)

@staticmethod
def video(model: VideoModels) -> ModelDefinition:
def video(model: VideoModels) -> VideoModelDefinition:
"""
Get a video model definition.
Video models only support the queue API.

Available models:
- "lucy-pro-t2v" - Text-to-video
- "lucy-pro-i2v" - Image-to-video
- "lucy-pro-v2v" - Video-to-video
- "lucy-pro-flf2v" - First-last-frame-to-video
- "lucy-dev-i2v" - Image-to-video (Dev quality)
- "lucy-fast-v2v" - Video-to-video (Fast quality)
- "lucy-motion" - Image-to-motion-video
"""
try:
return _MODELS["video"][model]
return _MODELS["video"][model] # type: ignore[return-value]
except KeyError:
raise ModelNotFoundError(model)

@staticmethod
def image(model: ImageModels) -> ModelDefinition:
def image(model: ImageModels) -> ImageModelDefinition:
"""
Get an image model definition.
Image models only support the process (sync) API.

Available models:
- "lucy-pro-t2i" - Text-to-image
- "lucy-pro-i2i" - Image-to-image
"""
try:
return _MODELS["image"][model]
return _MODELS["image"][model] # type: ignore[return-value]
except KeyError:
raise ModelNotFoundError(model)

Expand Down
23 changes: 18 additions & 5 deletions decart/queue/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import aiohttp
from pydantic import ValidationError

from ..models import ModelDefinition
from ..models import VideoModelDefinition, _MODELS
from ..errors import InvalidInputError
from .request import submit_job, get_job_status, get_job_content
from .types import (
Expand All @@ -25,7 +25,8 @@

class QueueClient:
"""
Queue client for async job-based video and image generation.
Queue client for async job-based video generation.
Only video models support the queue API.

Jobs are submitted and processed asynchronously, allowing you to
poll for status and retrieve results when ready.
Expand Down Expand Up @@ -59,23 +60,35 @@ async def _get_session(self) -> aiohttp.ClientSession:

async def submit(self, options: dict[str, Any]) -> JobSubmitResponse:
"""
Submit a job to the queue for async processing.
Submit a video generation job to the queue for async processing.
Only video models are supported.
Returns immediately with job_id and initial status.

Args:
options: Submit options including model and inputs
- model: VideoModelDefinition from models.video()
- prompt: Text prompt for generation
- Additional model-specific inputs

Returns:
JobSubmitResponse with job_id and status

Raises:
InvalidInputError: If inputs are invalid
InvalidInputError: If inputs are invalid or model is not a video model
QueueSubmitError: If submission fails
"""
if "model" not in options:
raise InvalidInputError("model is required")

model: ModelDefinition = options["model"]
model: VideoModelDefinition = options["model"]

# Validate that this is a video model (check against registry)
if model.name not in _MODELS["video"]:
raise InvalidInputError(
f"Model '{model.name}' is not supported by queue API. "
f"Only video models support async queue processing. "
f"For image models, use client.process() instead."
)

inputs = {k: v for k, v in options.items() if k not in ("model", "cancel_token")}

Expand Down
18 changes: 13 additions & 5 deletions examples/process_url.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
Video transformation from URL example using the Queue API.
Video models only support async queue processing.
"""

import asyncio
import os
from decart import DecartClient, models
Expand All @@ -6,18 +11,21 @@
async def main() -> None:
async with DecartClient(api_key=os.getenv("DECART_API_KEY", "your-api-key-here")) as client:
print("Transforming video from URL...")
result = await client.process(
result = await client.queue.submit_and_poll(
{
"model": models.video("lucy-pro-v2v"),
"prompt": "Watercolor painting style",
"data": "https://docs.platform.decart.ai/assets/example-video.mp4",
"on_status_change": lambda job: print(f" Status: {job.status}"),
}
)

with open("output_url.mp4", "wb") as f:
f.write(result)

print("Video saved to output_url.mp4")
if result.status == "completed":
with open("output_url.mp4", "wb") as f:
f.write(result.data)
print("Video saved to output_url.mp4")
else:
print(f"Job failed: {result.error}")


if __name__ == "__main__":
Expand Down
36 changes: 25 additions & 11 deletions examples/process_video.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,54 @@
"""
Video generation example using the Queue API.
Video models only support async queue processing.
"""

import asyncio
import os
from decart import DecartClient, models


async def main() -> None:
async with DecartClient(api_key=os.getenv("DECART_API_KEY", "your-api-key-here")) as client:
# Text-to-video generation
print("Generating video from text...")
result = await client.process(
result = await client.queue.submit_and_poll(
{
"model": models.video("lucy-pro-t2v"),
"prompt": "A serene lake at sunset with mountains in the background",
"seed": 42,
"on_status_change": lambda job: print(f" Status: {job.status}"),
}
)

with open("output_t2v.mp4", "wb") as f:
f.write(result)

print("Video saved to output_t2v.mp4")
if result.status == "completed":
with open("output_t2v.mp4", "wb") as f:
f.write(result.data)
print("Video saved to output_t2v.mp4")
else:
print(f"Text-to-video failed: {result.error}")
return

print("Transforming video...")
# Video-to-video transformation
print("\nTransforming video...")
with open("output_t2v.mp4", "rb") as video_file:
result = await client.process(
result = await client.queue.submit_and_poll(
{
"model": models.video("lucy-pro-v2v"),
"prompt": "Anime style with vibrant colors",
"data": video_file,
"enhance_prompt": True,
"num_inference_steps": 50,
"on_status_change": lambda job: print(f" Status: {job.status}"),
}
)

with open("output_v2v.mp4", "wb") as f:
f.write(result)

print("Video saved to output_v2v.mp4")
if result.status == "completed":
with open("output_v2v.mp4", "wb") as f:
f.write(result.data)
print("Video saved to output_v2v.mp4")
else:
print(f"Video-to-video failed: {result.error}")


if __name__ == "__main__":
Expand Down
Loading
Loading