Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion content-gen/infra/main.bicep
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ module applicationInsights 'br/public:avm/res/insights/component:0.7.1' = if (en
disableIpMasking: false
flowType: 'Bluefield'
workspaceResourceId: logAnalyticsWorkspaceResourceId
diagnosticSettings: [{ workspaceResourceId: logAnalyticsWorkspaceResourceId }]
}
}

Expand Down Expand Up @@ -869,6 +868,8 @@ module containerInstance 'modules/container-instance.bicep' = {
{ name: 'AZURE_AI_PROJECT_ENDPOINT', value: aiFoundryAiProjectEndpoint }
{ name: 'AZURE_AI_MODEL_DEPLOYMENT_NAME', value: gptModelName }
{ name: 'AZURE_AI_IMAGE_MODEL_DEPLOYMENT', value: imageModelConfig[imageModelChoice].name }
// Application Insights
{ name: 'APPLICATIONINSIGHTS_CONNECTION_STRING', value: enableMonitoring ? applicationInsights!.outputs.connectionString : '' }
]
}
}
Expand Down
16 changes: 7 additions & 9 deletions content-gen/infra/main.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"_generator": {
"name": "bicep",
"version": "0.41.2.15936",
"templateHash": "15058520269193157842"
"templateHash": "11360056711318509175"
},
"name": "Intelligent Content Generation Accelerator",
"description": "Solution Accelerator for multimodal marketing content generation using Microsoft Agent Framework.\n"
Expand Down Expand Up @@ -3562,14 +3562,7 @@
"flowType": {
"value": "Bluefield"
},
"workspaceResourceId": "[if(variables('useExistingLogAnalytics'), createObject('value', parameters('existingLogAnalyticsWorkspaceId')), if(parameters('enableMonitoring'), createObject('value', reference('logAnalyticsWorkspace').outputs.resourceId.value), createObject('value', '')))]",
"diagnosticSettings": {
"value": [
{
"workspaceResourceId": "[if(variables('useExistingLogAnalytics'), parameters('existingLogAnalyticsWorkspaceId'), if(parameters('enableMonitoring'), reference('logAnalyticsWorkspace').outputs.resourceId.value, ''))]"
}
]
}
"workspaceResourceId": "[if(variables('useExistingLogAnalytics'), createObject('value', parameters('existingLogAnalyticsWorkspaceId')), if(parameters('enableMonitoring'), createObject('value', reference('logAnalyticsWorkspace').outputs.resourceId.value), createObject('value', '')))]"
},
"template": {
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
Expand Down Expand Up @@ -33364,6 +33357,10 @@
{
"name": "AZURE_AI_IMAGE_MODEL_DEPLOYMENT",
"value": "[variables('imageModelConfig')[parameters('imageModelChoice')].name]"
},
{
"name": "APPLICATIONINSIGHTS_CONNECTION_STRING",
"value": "[if(parameters('enableMonitoring'), reference('applicationInsights').outputs.connectionString.value, '')]"
}
]
}
Expand Down Expand Up @@ -33561,6 +33558,7 @@
},
"dependsOn": [
"aiFoundryAiServicesProject",
"applicationInsights",
"userAssignedIdentity",
"virtualNetwork"
]
Expand Down
101 changes: 100 additions & 1 deletion content-gen/src/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from quart import Quart, request, jsonify, Response
from quart_cors import cors
from opentelemetry import trace

from settings import app_settings
from models import CreativeBrief, Product
Expand All @@ -24,6 +25,9 @@
from services.title_service import get_title_service
from services.routing_service import get_routing_service, Intent, ConversationState
from api.admin import admin_bp
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
from event_utils import track_event_if_configured

# In-memory task storage for generation tasks
# In production, this should be replaced with Redis or similar
Expand All @@ -36,17 +40,75 @@
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)

# Create Quart app
app = Quart(__name__)
app = cors(app, allow_origin="*")

# Check if the Application Insights connection string is set in the environment variables
appinsights_connection_string = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
if appinsights_connection_string:
# Configure Application Insights if the connection string is found
configure_azure_monitor(
connection_string=appinsights_connection_string,
enable_live_metrics=False,
enable_performance_counters=False,
)
# Suppress verbose Azure SDK INFO logs from App Insights
# WARNING/ERROR/CRITICAL from these loggers still come through
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.WARNING)
logging.getLogger("azure.monitor.opentelemetry.exporter").setLevel(logging.WARNING)
logging.getLogger("azure.identity").setLevel(logging.WARNING)
logging.getLogger("azure.cosmos").setLevel(logging.WARNING)
logging.getLogger("api.admin").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
# Apply ASGI middleware for request tracing (Quart is not auto-instrumented by configure_azure_monitor)
# Exclude health probes, post-deploy admin calls, and polling endpoints from telemetry
app.asgi_app = OpenTelemetryMiddleware(
app.asgi_app,
exclude_spans=["receive", "send"],
excluded_urls="health,api/admin,api/generate/status",
)
logger.info("Application Insights configured with the provided connection string")
else:
# Log a warning if the connection string is not found
logger.warning("No Application Insights connection string found. Skipping configuration")

# Register blueprints
app.register_blueprint(admin_bp)


@app.before_request
async def set_conversation_context():
"""Attach conversation_id and user_id to the current OTel span for App Insights."""
conversation_id = ""
user_id = ""

# 1. Extract from JSON body (POST requests)
if request.content_type and "json" in request.content_type:
data = await request.get_json(silent=True)
if data and isinstance(data, dict):
conversation_id = data.get("conversation_id", "")
user_id = data.get("user_id", "")

# 2. Extract from URL path parameters (e.g. /api/conversations/<conversation_id>)
if not conversation_id and request.view_args:
conversation_id = request.view_args.get("conversation_id", "")

# 3. Extract from query parameters (e.g. ?conversation_id=xxx)
if not conversation_id:
conversation_id = request.args.get("conversation_id", "")

if not user_id:
user_id = request.args.get("user_id", "") or request.headers.get("X-Ms-Client-Principal-Id", "anonymous")

span = trace.get_current_span()
if span.is_recording():
span.set_attribute("conversation_id", conversation_id)
span.set_attribute("user_id", user_id)


# ==================== Authentication Helper ====================

def get_authenticated_user():
Expand Down Expand Up @@ -98,6 +160,8 @@ async def handle_chat():
selected_products = data.get("selected_products", [])
brief_data = data.get("brief", {})

track_event_if_configured("Chat_Request_Received", {"conversation_id": conversation_id, "user_id": user_id})

# Get services
routing_service = get_routing_service()
orchestrator = get_orchestrator()
Expand Down Expand Up @@ -212,6 +276,7 @@ async def handle_chat():

except Exception as e:
logger.exception(f"Error handling message: {e}")
track_event_if_configured("Error_Chat_Handler", {"conversation_id": conversation_id, "user_id": user_id, "error": str(e)})
return jsonify({
"action_type": "error",
"message": f"An error occurred: {str(e)}",
Expand All @@ -230,6 +295,8 @@ async def _handle_parse_brief(
) -> Response:
"""Handle parsing a new brief from user message."""

track_event_if_configured("Brief_Parse_Request", {"conversation_id": conversation_id, "user_id": user_id})

generated_title = None

# Save user message
Expand Down Expand Up @@ -261,6 +328,7 @@ async def _handle_parse_brief(
brief, questions, blocked = await orchestrator.parse_brief(message)

if blocked:
track_event_if_configured("Error_RAI_Check_Failed", {"conversation_id": conversation_id, "user_id": user_id, "status": "Brief parse blocked by RAI"})
# Content was blocked by RAI - save refusal as assistant response
try:
cosmos_service = await get_cosmos_service()
Expand Down Expand Up @@ -371,8 +439,11 @@ async def _handle_confirm_brief(
try:
brief = CreativeBrief(**brief_data)
except Exception as e:
track_event_if_configured("Error_Brief_Invalid_Format", {"conversation_id": conversation_id, "user_id": user_id, "error": str(e)})
return jsonify({"error": f"Invalid brief format: {str(e)}"}), 400

track_event_if_configured("Brief_Confirmed", {"conversation_id": conversation_id, "user_id": user_id})

try:
cosmos_service = await get_cosmos_service()

Expand Down Expand Up @@ -417,6 +488,8 @@ async def _handle_refine_brief(
) -> Response:
"""Handle brief refinement based on user feedback."""

track_event_if_configured("Brief_Refine_Request", {"conversation_id": conversation_id, "user_id": user_id})

# Get existing brief if available
existing_brief = conversation.get("brief") if conversation else None

Expand All @@ -439,6 +512,7 @@ async def _handle_refine_brief(
brief, questions, blocked = await orchestrator.parse_brief(message)

if blocked:
track_event_if_configured("Error_RAI_Check_Failed", {"conversation_id": conversation_id, "user_id": user_id, "status": "Brief refinement blocked by RAI"})
return jsonify({
"action_type": "rai_blocked",
"message": questions,
Expand Down Expand Up @@ -547,6 +621,8 @@ async def _handle_search_products(
) -> Response:
"""Handle product search/selection via natural language."""

track_event_if_configured("Product_Selection_Request", {"conversation_id": conversation_id, "user_id": user_id})

# Save user message
try:
cosmos_service = await get_cosmos_service()
Expand Down Expand Up @@ -626,6 +702,7 @@ async def _handle_generate_content(
try:
brief = CreativeBrief(**brief_data)
except Exception as e:
track_event_if_configured("Error_Generation_Invalid_Brief", {"conversation_id": conversation_id, "user_id": user_id, "error": str(e)})
return jsonify({
"action_type": "error",
"message": f"Invalid brief format: {str(e)}",
Expand All @@ -645,6 +722,8 @@ async def _handle_generate_content(
"error": None
}

track_event_if_configured("Generation_Started", {"task_id": task_id, "conversation_id": conversation_id, "user_id": user_id, "generate_images": str(generate_images)})

# Save user request
try:
cosmos_service = await get_cosmos_service()
Expand Down Expand Up @@ -694,6 +773,9 @@ async def _handle_modify_image(
selected_products: list = None
) -> Response:
"""Handle image modification requests."""

track_event_if_configured("Regeneration_Request", {"conversation_id": conversation_id, "user_id": user_id})

# Get products from frontend (frontend handles product detection)
# This matches the original implementation where frontend detected product changes
frontend_products = selected_products or []
Expand Down Expand Up @@ -729,6 +811,7 @@ async def _handle_modify_image(
generated_content = conversation.get("generated_content") if conversation else None

if not generated_content:
track_event_if_configured("Error_Regeneration_No_Content", {"conversation_id": conversation_id, "user_id": user_id})
return jsonify({
"action_type": "error",
"message": "No generated content found. Please generate content first.",
Expand Down Expand Up @@ -756,6 +839,7 @@ async def _handle_modify_image(
brief = None

if not brief:
track_event_if_configured("Error_Regeneration_No_Brief", {"conversation_id": conversation_id, "user_id": user_id})
return jsonify({
"action_type": "error",
"message": "No brief found. Please create and confirm a brief first.",
Expand Down Expand Up @@ -836,6 +920,7 @@ async def _run_regeneration_task(

# Check for RAI block
if response.get("rai_blocked"):
track_event_if_configured("Error_RAI_Check_Failed", {"conversation_id": conversation_id, "user_id": user_id, "status": "Regeneration blocked by RAI"})
_generation_tasks[task_id]["status"] = "failed"
_generation_tasks[task_id]["error"] = response.get("error", "Request blocked by content safety")
return
Expand Down Expand Up @@ -958,6 +1043,7 @@ async def _run_regeneration_task(
"selected_products": products_data,
"updated_brief": updated_brief_dict, # Include updated brief for frontend
}
track_event_if_configured("Regeneration_Completed", {"task_id": task_id, "conversation_id": conversation_id, "user_id": user_id})

# Clear active regeneration marker (only if it's still our task)
active_info = _active_regenerations.get(conversation_id, {})
Expand All @@ -969,6 +1055,7 @@ async def _run_regeneration_task(
logger.exception(f"Error in regeneration task {task_id}: {e}")
_generation_tasks[task_id]["status"] = "failed"
_generation_tasks[task_id]["error"] = str(e)
track_event_if_configured("Error_Regeneration_Failed", {"task_id": task_id, "conversation_id": conversation_id, "user_id": user_id, "error": str(e)})
# Clear active regeneration marker on error too
active_info = _active_regenerations.get(conversation_id, {})
if active_info.get("task_id") == task_id:
Expand All @@ -981,6 +1068,8 @@ async def _handle_start_over(
) -> Response:
"""Handle start over request - clears the current session."""

track_event_if_configured("Session_Reset", {"conversation_id": conversation_id, "user_id": user_id})

# For start over, we create a new conversation
new_conversation_id = str(uuid.uuid4())

Expand All @@ -1002,6 +1091,8 @@ async def _handle_general_chat(
) -> Response:
"""Handle general chat messages."""

track_event_if_configured("General_Chat_Request", {"conversation_id": conversation_id, "user_id": user_id})

# Save user message
try:
cosmos_service = await get_cosmos_service()
Expand Down Expand Up @@ -1145,13 +1236,15 @@ async def _run_generation_task(task_id: str, brief: CreativeBrief, products_data
_generation_tasks[task_id]["status"] = "completed"
_generation_tasks[task_id]["result"] = response
_generation_tasks[task_id]["completed_at"] = datetime.now(timezone.utc).isoformat()
track_event_if_configured("Generation_Completed", {"task_id": task_id, "conversation_id": conversation_id, "user_id": user_id})
logger.info(f"Task {task_id} marked as completed")

except Exception as e:
logger.exception(f"Generation task {task_id} failed: {e}")
_generation_tasks[task_id]["status"] = "failed"
_generation_tasks[task_id]["error"] = str(e)
_generation_tasks[task_id]["completed_at"] = datetime.now(timezone.utc).isoformat()
track_event_if_configured("Error_Generation_Failed", {"task_id": task_id, "conversation_id": conversation_id, "user_id": user_id, "error": str(e)})


@app.route("/api/generate/start", methods=["POST"])
Expand Down Expand Up @@ -1187,6 +1280,7 @@ async def start_generation():
try:
brief = CreativeBrief(**brief_data)
except Exception as e:
track_event_if_configured("Error_Generation_Invalid_Brief", {"conversation_id": conversation_id, "user_id": user_id, "error": str(e)})
return jsonify({"error": f"Invalid brief format: {str(e)}"}), 400

# Create task ID
Expand Down Expand Up @@ -1229,6 +1323,8 @@ async def start_generation():

logger.info(f"Started generation task {task_id} for conversation {conversation_id}")

track_event_if_configured("Generation_Started", {"task_id": task_id, "conversation_id": conversation_id, "user_id": user_id, "generate_images": str(generate_images)})

return jsonify({
"task_id": task_id,
"status": "pending",
Expand Down Expand Up @@ -1555,6 +1651,7 @@ async def delete_conversation(conversation_id: str):
try:
cosmos_service = await get_cosmos_service()
await cosmos_service.delete_conversation(conversation_id, user_id)
track_event_if_configured("Conversation_Deleted", {"conversation_id": conversation_id, "user_id": user_id})
return jsonify({"success": True, "message": "Conversation deleted"})
except Exception as e:
logger.warning(f"Failed to delete conversation: {e}")
Expand Down Expand Up @@ -1586,6 +1683,7 @@ async def update_conversation(conversation_id: str):
cosmos_service = await get_cosmos_service()
result = await cosmos_service.rename_conversation(conversation_id, user_id, new_title)
if result:
track_event_if_configured("Conversation_Renamed", {"conversation_id": conversation_id, "user_id": user_id})
return jsonify({"success": True, "message": "Conversation renamed", "title": new_title})
return jsonify({"error": "Conversation not found"}), 404
except Exception as e:
Expand All @@ -1606,6 +1704,7 @@ async def delete_all_conversations():
try:
cosmos_service = await get_cosmos_service()
deleted_count = await cosmos_service.delete_all_conversations(user_id)
track_event_if_configured("Conversations_All_Deleted", {"user_id": user_id, "deleted_count": str(deleted_count)})
return jsonify({
"success": True,
"message": f"Deleted {deleted_count} conversations",
Expand Down
11 changes: 11 additions & 0 deletions content-gen/src/backend/event_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import logging
import os
from azure.monitor.events.extension import track_event


def track_event_if_configured(event_name: str, event_data: dict):
connection_string = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
if connection_string:
track_event(event_name, event_data)
else:
logging.warning(f"Skipping track_event for {event_name} as Application Insights is not configured")
5 changes: 5 additions & 0 deletions content-gen/src/backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ openai>=1.45.0
# HTTP Client (for Foundry direct API calls)
httpx>=0.27.0

# Monitoring / Telemetry
azure-monitor-opentelemetry>=1.6.0
azure-monitor-events-extension>=0.1.0
opentelemetry-instrumentation-asgi>=0.48b0

# Data Validation
pydantic>=2.8.0
pydantic-settings>=2.4.0
Expand Down
Loading