From 424ca2eaeb7051eac5d1b48564d588bcf4eff063 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Sun, 5 Jan 2025 18:16:23 +0000 Subject: [PATCH 01/11] draft --- .gitignore | 2 +- src/humanloop/client.py | 1 - src/humanloop/eval_utils/run.py | 5 - src/humanloop/otel/__init__.py | 10 +- src/humanloop/otel/constants.py | 2 + src/humanloop/otel/exporter.py | 128 +++---- src/humanloop/otel/helpers.py | 12 +- src/humanloop/otel/processor.py | 29 +- src/humanloop/prompts/client.py | 171 +++++----- src/humanloop/utilities/__init__.py | 0 src/humanloop/utilities/flow.py | 89 +++++ src/humanloop/utilities/helpers.py | 21 ++ src/humanloop/utilities/prompt.py | 88 +++++ src/humanloop/utilities/tool.py | 505 ++++++++++++++++++++++++++++ src/humanloop/utilities/types.py | 12 + 15 files changed, 901 insertions(+), 174 deletions(-) create mode 100644 src/humanloop/utilities/__init__.py create mode 100644 src/humanloop/utilities/flow.py create mode 100644 src/humanloop/utilities/helpers.py create mode 100644 src/humanloop/utilities/prompt.py create mode 100644 src/humanloop/utilities/tool.py create mode 100644 src/humanloop/utilities/types.py diff --git a/.gitignore b/.gitignore index 6463b520..063f6123 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ __pycache__/ poetry.toml .ruff_cache/ .vscode -.env \ No newline at end of file +.env diff --git a/src/humanloop/client.py b/src/humanloop/client.py index 1663e7ed..2d582dbf 100644 --- a/src/humanloop/client.py +++ b/src/humanloop/client.py @@ -49,7 +49,6 @@ def run( name: Optional[str], dataset: Dataset, evaluators: Optional[Sequence[Evaluator]] = None, - # logs: typing.Sequence[dict] | None = None, workers: int = 4, ) -> List[EvaluatorCheck]: """Evaluate your function for a given `Dataset` and set of `Evaluators`. diff --git a/src/humanloop/eval_utils/run.py b/src/humanloop/eval_utils/run.py index ff6dce61..3d1a5c9e 100644 --- a/src/humanloop/eval_utils/run.py +++ b/src/humanloop/eval_utils/run.py @@ -212,10 +212,6 @@ def increment(self): sys.stderr.write("\n") -# Module-level so it can be shared by threads. -_PROGRESS_BAR: Optional[_SimpleProgressBar] = None - - def run_eval( client: "BaseHumanloop", file: File, @@ -236,7 +232,6 @@ def run_eval( :param workers: the number of threads to process datapoints using your function concurrently. :return: per Evaluator checks. """ - global _PROGRESS_BAR if hasattr(file["callable"], "file"): # When the decorator inside `file` is a decorated function, diff --git a/src/humanloop/otel/__init__.py b/src/humanloop/otel/__init__.py index 0a1eab92..82fd0c68 100644 --- a/src/humanloop/otel/__init__.py +++ b/src/humanloop/otel/__init__.py @@ -2,6 +2,7 @@ from opentelemetry.sdk.trace import TracerProvider from typing_extensions import NotRequired +from opentelemetry.sdk.trace import TracerProvider from humanloop.otel.helpers import module_is_installed @@ -41,12 +42,3 @@ def instrument_provider(provider: TracerProvider): from opentelemetry.instrumentation.bedrock import BedrockInstrumentor BedrockInstrumentor().instrument(tracer_provider=provider) - - -class FlowContext(TypedDict): - trace_id: NotRequired[str] - trace_parent_id: NotRequired[Optional[int]] - is_flow_log: NotRequired[bool] - - -TRACE_FLOW_CONTEXT: dict[int, FlowContext] = {} diff --git a/src/humanloop/otel/constants.py b/src/humanloop/otel/constants.py index d28126a0..06de824d 100644 --- a/src/humanloop/otel/constants.py +++ b/src/humanloop/otel/constants.py @@ -4,3 +4,5 @@ HUMANLOOP_LOG_KEY = "humanloop.log" HUMANLOOP_FILE_TYPE_KEY = "humanloop.file.type" HUMANLOOP_PATH_KEY = "humanloop.file.path" +# Required for the exporter to know when to mark the Flow Log as complete +HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites" diff --git a/src/humanloop/otel/exporter.py b/src/humanloop/otel/exporter.py index 7e7de7e5..3417fe1b 100644 --- a/src/humanloop/otel/exporter.py +++ b/src/humanloop/otel/exporter.py @@ -1,7 +1,7 @@ import contextvars -import json import logging import threading +import time import typing from queue import Empty as EmptyQueue from queue import Queue @@ -14,16 +14,18 @@ from humanloop.core import ApiError as HumanloopApiError from humanloop.eval_utils.context import EVALUATION_CONTEXT_VARIABLE_NAME, EvaluationContext -from humanloop.otel import TRACE_FLOW_CONTEXT, FlowContext +from humanloop.otel import TRACE_FLOW_CONTEXT from humanloop.otel.constants import ( HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_FLOW_PREREQUISITES_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_PATH_KEY, ) from humanloop.otel.helpers import is_humanloop_span, read_from_opentelemetry_span from humanloop.requests.flow_kernel_request import FlowKernelRequestParams from humanloop.requests.prompt_kernel_request import PromptKernelRequestParams +from humanloop.requests.tool_kernel_request import ToolKernelRequestParams if typing.TYPE_CHECKING: from humanloop.client import Humanloop @@ -69,7 +71,8 @@ def __init__( for thread in self._threads: thread.start() logger.debug("Exporter Thread %s started", thread.ident) - self._flow_logs_to_complete: list[str] = [] + # Flow Log Span ID mapping to children Spans that must be uploaded first + self._flow_log_prerequisites: dict[int, set[int]] = {} def export(self, spans: trace.Sequence[ReadableSpan]) -> SpanExportResult: def is_evaluated_file( @@ -133,11 +136,6 @@ def shutdown(self) -> None: for thread in self._threads: thread.join() logger.debug("Exporter Thread %s joined", thread.ident) - for log_id in self._flow_logs_to_complete: - self._client.flows.update_log( - log_id=log_id, - trace_status="complete", - ) def force_flush(self, timeout_millis: int = 3000) -> bool: self._shutdown = True @@ -211,9 +209,22 @@ def _do_work(self): self._upload_queue.put((span_to_export, evaluation_context)) self._upload_queue.task_done() + def _complete_flow_log(self, span_id: int) -> None: + for flow_log_span_id, flow_children_span_ids in self._flow_log_prerequisites.items(): + if span_id in flow_children_span_ids: + flow_children_span_ids.remove(span_id) + if len(flow_children_span_ids) == 0: + flow_log_id = self._span_id_to_uploaded_log_id[flow_log_span_id] + self._client.flows.update_log(log_id=flow_log_id, trace_status="complete") + break + def _export_span_dispatch(self, span: ReadableSpan) -> None: hl_file = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) file_type = span._attributes.get(HUMANLOOP_FILE_TYPE_KEY) # type: ignore + parent_span_id = span.parent.span_id if span.parent else None + + while parent_span_id and self._span_id_to_uploaded_log_id.get(parent_span_id) is None: + time.sleep(0.1) if file_type == "prompt": export_func = self._export_prompt @@ -242,25 +253,16 @@ def _export_prompt(self, span: ReadableSpan) -> None: log_object["messages"] = [] if "tools" not in file_object["prompt"]: file_object["prompt"]["tools"] = [] - trace_metadata = TRACE_FLOW_CONTEXT.get(span.get_span_context().span_id) - if trace_metadata and "trace_parent_id" in trace_metadata and trace_metadata["trace_parent_id"]: - trace_parent_id = self._span_id_to_uploaded_log_id[trace_metadata["trace_parent_id"]] - if trace_parent_id is None: - # Parent Log in Trace upload failed - file_path = read_from_opentelemetry_span(span, key=HUMANLOOP_PATH_KEY) - logger.error(f"Skipping log for {file_path}: parent Log upload failed") - return - else: - trace_parent_id = None - prompt: PromptKernelRequestParams = file_object["prompt"] + path: str = file_object["path"] - if "output" in log_object: - if not isinstance(log_object["output"], str): - # Output expected to be a string, if decorated function - # does not return one, jsonify it - log_object["output"] = json.dumps(log_object["output"]) + prompt: PromptKernelRequestParams = file_object["prompt"] + + span_parent_id = span.parent.span_id if span.parent else None + trace_parent_id = self._span_id_to_uploaded_log_id[span_parent_id] if span_parent_id else None + if "attributes" not in prompt or not prompt["attributes"]: prompt["attributes"] = {} + try: log_response = self._client.prompts.log( path=path, @@ -271,34 +273,32 @@ def _export_prompt(self, span: ReadableSpan) -> None: self._span_id_to_uploaded_log_id[span.context.span_id] = log_response.id except HumanloopApiError: self._span_id_to_uploaded_log_id[span.context.span_id] = None + self._complete_flow_log(span_id=span.context.span_id) def _export_tool(self, span: ReadableSpan) -> None: - file_object: dict[str, Any] = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) - log_object: dict[str, Any] = read_from_opentelemetry_span(span, key=HUMANLOOP_LOG_KEY) - trace_metadata: FlowContext = TRACE_FLOW_CONTEXT.get(span.get_span_context().span_id, {}) - if "trace_parent_id" in trace_metadata and trace_metadata["trace_parent_id"]: - trace_parent_id = self._span_id_to_uploaded_log_id.get( - trace_metadata["trace_parent_id"], - ) - if trace_parent_id is None: - # Parent Log in Trace upload failed - file_path = read_from_opentelemetry_span(span, key=HUMANLOOP_PATH_KEY) - logger.error(f"Skipping log for {file_path}: parent Log upload failed") - return - else: - trace_parent_id = None - tool = file_object["tool"] + file_object: dict[str, Any] = read_from_opentelemetry_span( + span, + key=HUMANLOOP_FILE_KEY, + ) + log_object: dict[str, Any] = read_from_opentelemetry_span( + span, + key=HUMANLOOP_LOG_KEY, + ) + + path: str = file_object["path"] + tool: ToolKernelRequestParams = file_object["tool"] + + span_parent_id = span.parent.span_id if span.parent else None + trace_parent_id = self._span_id_to_uploaded_log_id[span_parent_id] if span_parent_id else None + + # API expects an empty dictionary if user does not supply attributes if not tool.get("attributes"): tool["attributes"] = {} if not tool.get("setup_values"): tool["setup_values"] = {} - path: str = file_object["path"] if "parameters" in tool["function"] and "properties" not in tool["function"]["parameters"]: tool["function"]["parameters"]["properties"] = {} - if not isinstance(log_object["output"], str): - # Output expected to be a string, if decorated function - # does not return one, jsonify it - log_object["output"] = json.dumps(log_object["output"]) + try: log_response = self._client.tools.log( path=path, @@ -309,33 +309,34 @@ def _export_tool(self, span: ReadableSpan) -> None: self._span_id_to_uploaded_log_id[span.context.span_id] = log_response.id except HumanloopApiError: self._span_id_to_uploaded_log_id[span.context.span_id] = None + self._complete_flow_log(span_id=span.context.span_id) def _export_flow(self, span: ReadableSpan) -> None: - file_object: dict[str, Any] = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) - log_object: dict[str, Any] = read_from_opentelemetry_span(span, key=HUMANLOOP_LOG_KEY) - trace_metadata: FlowContext = TRACE_FLOW_CONTEXT.get( - span.get_span_context().span_id, - {}, + file_object: dict[str, Any] = read_from_opentelemetry_span( + span, + key=HUMANLOOP_FILE_KEY, ) - if "trace_parent_id" in trace_metadata: - trace_parent_id = self._span_id_to_uploaded_log_id.get( - trace_metadata["trace_parent_id"], # type: ignore - ) - if trace_parent_id is None and trace_metadata["trace_id"] != span.get_span_context().span_id: - # Parent Log in Trace upload failed - # NOTE: Check if the trace_id metadata field points to the - # span itself. This signifies the span is the head of the Trace - file_path = read_from_opentelemetry_span(span, key=HUMANLOOP_PATH_KEY) - logger.error(f"Skipping log for {file_path}: parent Log upload failed") - return - else: - trace_parent_id = None + log_object: dict[str, Any] = read_from_opentelemetry_span( + span, + key=HUMANLOOP_LOG_KEY, + ) + # Spans that must be uploaded before the Flow Span is completed + prerequisites = read_from_opentelemetry_span( + span=span, + key=HUMANLOOP_FLOW_PREREQUISITES_KEY, + ) + self._flow_log_prerequisites[span.context.span_id] = set(prerequisites) + + path: str = file_object["path"] flow: FlowKernelRequestParams if not file_object.get("flow"): flow = {"attributes": {}} else: flow = file_object["flow"] - path: str = file_object["path"] + + span_parent_id = span.parent.span_id if span.parent else None + trace_parent_id = self._span_id_to_uploaded_log_id[span_parent_id] if span_parent_id else None + if "output" not in log_object: log_object["output"] = None try: @@ -350,3 +351,4 @@ def _export_flow(self, span: ReadableSpan) -> None: except HumanloopApiError as e: logger.error(str(e)) self._span_id_to_uploaded_log_id[span.context.span_id] = None + self._complete_flow_log(span_id=span.context.span_id) diff --git a/src/humanloop/otel/helpers.py b/src/humanloop/otel/helpers.py index d25a5674..c620d577 100644 --- a/src/humanloop/otel/helpers.py +++ b/src/humanloop/otel/helpers.py @@ -267,13 +267,7 @@ def is_llm_provider_call(span: ReadableSpan) -> bool: def is_humanloop_span(span: ReadableSpan) -> bool: """Check if the Span was created by the Humanloop SDK.""" - try: - # Valid spans will have keys with the HL_FILE_OT_KEY and HL_LOG_OT_KEY prefixes present - read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) - read_from_opentelemetry_span(span, key=HUMANLOOP_LOG_KEY) - except KeyError: - return False - return True + return span.name.startswith("humanloop.") def module_is_installed(module_name: str) -> bool: @@ -288,10 +282,6 @@ def module_is_installed(module_name: str) -> bool: return True -def generate_span_id() -> str: - return str(uuid.uuid4()) - - def jsonify_if_not_string(func: Callable, output: Any) -> str: if not isinstance(output, str): try: diff --git a/src/humanloop/otel/processor.py b/src/humanloop/otel/processor.py index 3542c244..b42ec40b 100644 --- a/src/humanloop/otel/processor.py +++ b/src/humanloop/otel/processor.py @@ -2,12 +2,16 @@ from collections import defaultdict from typing import Any -# No typing stubs for parse from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter from pydantic import ValidationError as PydanticValidationError -from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY +from humanloop.otel.constants import ( + HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_FLOW_PREREQUISITES_KEY, + HUMANLOOP_LOG_KEY, +) from humanloop.otel.helpers import ( is_humanloop_span, is_llm_provider_call, @@ -40,10 +44,17 @@ def __init__(self, exporter: SpanExporter) -> None: super().__init__(exporter) # Span parent to Span children map self._children: dict[int, list] = defaultdict(list) - - # NOTE: Could override on_start and process Flow spans ahead of time - # and PATCH the created Logs in on_end. A special type of ReadableSpan could be - # used for this + self._prerequisites: dict[int, list[int]] = {} + + def on_start(self, span, parent_context=None): + span_id = span.context.span_id + if span.name == "humanloop.flow": + self._prerequisites[span_id] = [] + if span.parent and is_humanloop_span(span): + parent_span_id = span.parent.span_id + for trace_head, all_trace_nodes in self._prerequisites.items(): + if parent_span_id == trace_head or parent_span_id in all_trace_nodes: + all_trace_nodes.append(span_id) def on_end(self, span: ReadableSpan) -> None: if is_humanloop_span(span=span): @@ -57,6 +68,12 @@ def on_end(self, span: ReadableSpan) -> None: # arrives in order to enrich it self._children[span.parent.span_id].append(span) # Pass the Span to the Exporter + if span.name == "humanloop.flow": + write_to_opentelemetry_span( + span=span, + key=HUMANLOOP_FLOW_PREREQUISITES_KEY, + value=self._prerequisites[span.context.span_id], + ) self.span_exporter.export([span]) diff --git a/src/humanloop/prompts/client.py b/src/humanloop/prompts/client.py index e3f402b8..48950fdc 100644 --- a/src/humanloop/prompts/client.py +++ b/src/humanloop/prompts/client.py @@ -1229,6 +1229,12 @@ def upsert( provider="openai", max_tokens=-1, temperature=0.7, + top_p=1.0, + presence_penalty=0.0, + frequency_penalty=0.0, + other={}, + tools=[], + linked_tools=[], commit_message="Initial commit", ) """ @@ -3094,116 +3100,125 @@ async def upsert( request_options: typing.Optional[RequestOptions] = None, ) -> PromptResponse: """ - Create a Prompt or update it with a new version if it already exists. + Create a Prompt or update it with a new version if it already exists. - Prompts are identified by the `ID` or their `path`. The parameters (i.e. the prompt template, temperature, model etc.) determine the versions of the Prompt. + Prompts are identified by the `ID` or their `path`. The parameters (i.e. the prompt template, temperature, model etc.) determine the versions of the Prompt. - If you provide a commit message, then the new version will be committed; - otherwise it will be uncommitted. If you try to commit an already committed version, - an exception will be raised. + If you provide a commit message, then the new version will be committed; + otherwise it will be uncommitted. If you try to commit an already committed version, + an exception will be raised. - Parameters - ---------- - model : str - The model instance used, e.g. `gpt-4`. See [supported models](https://humanloop.com/docs/reference/supported-models) + Parameters + ---------- + model : str + The model instance used, e.g. `gpt-4`. See [supported models](https://humanloop.com/docs/reference/supported-models) - path : typing.Optional[str] - Path of the Prompt, including the name. This locates the Prompt in the Humanloop filesystem and is used as as a unique identifier. For example: `folder/name` or just `name`. + path : typing.Optional[str] + Path of the Prompt, including the name. This locates the Prompt in the Humanloop filesystem and is used as as a unique identifier. For example: `folder/name` or just `name`. - id : typing.Optional[str] - ID for an existing Prompt. + id : typing.Optional[str] + ID for an existing Prompt. - endpoint : typing.Optional[ModelEndpoints] - The provider model endpoint used. + endpoint : typing.Optional[ModelEndpoints] + The provider model endpoint used. - template : typing.Optional[PromptRequestTemplateParams] - The template contains the main structure and instructions for the model, including input variables for dynamic values. + template : typing.Optional[PromptRequestTemplateParams] + The template contains the main structure and instructions for the model, including input variables for dynamic values. - For chat models, provide the template as a ChatTemplate (a list of messages), e.g. a system message, followed by a user message with an input variable. - For completion models, provide a prompt template as a string. + For chat models, provide the template as a ChatTemplate (a list of messages), e.g. a system message, followed by a user message with an input variable. + For completion models, provide a prompt template as a string. - Input variables should be specified with double curly bracket syntax: `{{input_name}}`. + Input variables should be specified with double curly bracket syntax: `{{input_name}}`. - provider : typing.Optional[ModelProviders] - The company providing the underlying model service. + provider : typing.Optional[ModelProviders] + The company providing the underlying model service. - max_tokens : typing.Optional[int] - The maximum number of tokens to generate. Provide max_tokens=-1 to dynamically calculate the maximum number of tokens to generate given the length of the prompt + max_tokens : typing.Optional[int] + The maximum number of tokens to generate. Provide max_tokens=-1 to dynamically calculate the maximum number of tokens to generate given the length of the prompt - temperature : typing.Optional[float] - What sampling temperature to use when making a generation. Higher values means the model will be more creative. + temperature : typing.Optional[float] + What sampling temperature to use when making a generation. Higher values means the model will be more creative. - top_p : typing.Optional[float] - An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. + top_p : typing.Optional[float] + An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. - stop : typing.Optional[PromptRequestStopParams] - The string (or list of strings) after which the model will stop generating. The returned text will not contain the stop sequence. + stop : typing.Optional[PromptRequestStopParams] + The string (or list of strings) after which the model will stop generating. The returned text will not contain the stop sequence. - presence_penalty : typing.Optional[float] - Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the generation so far. + presence_penalty : typing.Optional[float] + Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the generation so far. - frequency_penalty : typing.Optional[float] - Number between -2.0 and 2.0. Positive values penalize new tokens based on how frequently they appear in the generation so far. + frequency_penalty : typing.Optional[float] + Number between -2.0 and 2.0. Positive values penalize new tokens based on how frequently they appear in the generation so far. - other : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] - Other parameter values to be passed to the provider call. + other : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] + Other parameter values to be passed to the provider call. - seed : typing.Optional[int] - If specified, model will make a best effort to sample deterministically, but it is not guaranteed. + seed : typing.Optional[int] + If specified, model will make a best effort to sample deterministically, but it is not guaranteed. - response_format : typing.Optional[ResponseFormatParams] - The format of the response. Only `{"type": "json_object"}` is currently supported for chat. + response_format : typing.Optional[ResponseFormatParams] + The format of the response. Only `{"type": "json_object"}` is currently supported for chat. - tools : typing.Optional[typing.Sequence[ToolFunctionParams]] - The tool specification that the model can choose to call if Tool calling is supported. + tools : typing.Optional[typing.Sequence[ToolFunctionParams]] + The tool specification that the model can choose to call if Tool calling is supported. - linked_tools : typing.Optional[typing.Sequence[str]] - The IDs of the Tools in your organization that the model can choose to call if Tool calling is supported. The default deployed version of that tool is called. + linked_tools : typing.Optional[typing.Sequence[str]] + The IDs of the Tools in your organization that the model can choose to call if Tool calling is supported. The default deployed version of that tool is called. - attributes : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] - Additional fields to describe the Prompt. Helpful to separate Prompt versions from each other with details on how they were created or used. + attributes : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] + Additional fields to describe the Prompt. Helpful to separate Prompt versions from each other with details on how they were created or used. - commit_message : typing.Optional[str] - Message describing the changes made. + commit_message : typing.Optional[str] + Message describing the changes made. - request_options : typing.Optional[RequestOptions] - Request-specific configuration. + request_options : typing.Optional[RequestOptions] + Request-specific configuration. - Returns - ------- - PromptResponse - Successful Response + Returns + ------- + PromptResponse + Successful Response - Examples - -------- - import asyncio + Examples + -------- + import asyncio - from humanloop import AsyncHumanloop + from humanloop import AsyncHumanloop - client = AsyncHumanloop( - api_key="YOUR_API_KEY", - ) + client = AsyncHumanloop( + api_key="YOUR_API_KEY", + ) - async def main() -> None: - await client.prompts.upsert( - path="Personal Projects/Coding Assistant", - model="gpt-4o", - endpoint="chat", - template=[ - { - "content": "You are a helpful coding assistant specialising in {{language}}", - "role": "system", - } - ], - provider="openai", - max_tokens=-1, - temperature=0.7, - commit_message="Initial commit", - ) + async def main() -> None: + await client.prompts.upsert( + path="Personal Projects/Coding Assistant", + model="gpt-4o", + endpoint="chat", + template=[ + { + "content": "You are a helpful coding assistant specialising in {{language}}", + "role": "system", + } + ], + provider="openai", + max_tokens=-1, + temperature=0.7, + <<<<<<< HEAD + ======= + top_p=1.0, + presence_penalty=0.0, + frequency_penalty=0.0, + other={}, + tools=[], + linked_tools=[], + >>>>>>> 0799123 (draft) + commit_message="Initial commit", + ) - asyncio.run(main()) + asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( "prompts", diff --git a/src/humanloop/utilities/__init__.py b/src/humanloop/utilities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/humanloop/utilities/flow.py b/src/humanloop/utilities/flow.py new file mode 100644 index 00000000..f63573ed --- /dev/null +++ b/src/humanloop/utilities/flow.py @@ -0,0 +1,89 @@ +import logging +from functools import wraps +from typing import Any, Callable, Mapping, Optional, Sequence + +from opentelemetry.sdk.trace import Span +from opentelemetry.trace import Tracer +from typing_extensions import Unpack + +from humanloop.utilities.helpers import args_to_inputs +from humanloop.eval_utils.types import File +from humanloop.otel.constants import ( + HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_LOG_KEY, + HUMANLOOP_PATH_KEY, +) +from humanloop.otel.helpers import jsonify_if_not_string, write_to_opentelemetry_span +from humanloop.requests import FlowKernelRequestParams as FlowDict +from humanloop.requests.flow_kernel_request import FlowKernelRequestParams + +logger = logging.getLogger("humanloop.sdk") + + +def flow( + opentelemetry_tracer: Tracer, + path: Optional[str] = None, + **flow_kernel: Unpack[FlowKernelRequestParams], # type: ignore +): + flow_kernel["attributes"] = {k: v for k, v in flow_kernel.get("attributes", {}).items() if v is not None} + + def decorator(func: Callable): + @wraps(func) + def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: + span: Span + with opentelemetry_tracer.start_as_current_span("humanloop.flow") as span: # type: ignore + span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) + span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "flow") + + if flow_kernel: + write_to_opentelemetry_span( + span=span, + key=f"{HUMANLOOP_FILE_KEY}.flow", + value=flow_kernel, # type: ignore + ) + + # Call the decorated function + try: + output = func(*args, **kwargs) + output_stringified = jsonify_if_not_string( + func=func, + output=output, + ) + error = None + except Exception as e: + logger.error(f"Error calling {func.__name__}: {e}") + output = None + output_stringified = jsonify_if_not_string( + func=func, + output=None, + ) + error = str(e) + + flow_log = { + "inputs": args_to_inputs(func, args, kwargs), + "output": output_stringified, + "error": error, + } + + # Write the Flow Log to the Span on HL_LOG_OT_KEY + if flow_log: + write_to_opentelemetry_span( + span=span, + key=HUMANLOOP_LOG_KEY, + value=flow_log, # type: ignore + ) + + # Return the output of the decorated function + return output + + wrapper.file = File( # type: ignore + path=path if path else func.__name__, + type="flow", + version=FlowDict(**flow_kernel), # type: ignore + callable=wrapper, + ) + + return wrapper + + return decorator diff --git a/src/humanloop/utilities/helpers.py b/src/humanloop/utilities/helpers.py new file mode 100644 index 00000000..d501f800 --- /dev/null +++ b/src/humanloop/utilities/helpers.py @@ -0,0 +1,21 @@ +import inspect +from typing import Any, Callable + + +def args_to_inputs(func: Callable, args: tuple, kwargs: dict) -> dict[str, Any]: + """Maps arguments to their corresponding parameter names in the function signature. + + For example: + ```python + def foo(a, b=2, c=3): + pass + + assert args_to_inputs(foo, (1, 2), {}) == {'a': 1, 'b': 2, 'c': 3} + assert args_to_inputs(foo, (1,), {'b': 8}) == {'a': 1, 'b': 8, 'c': 3} + assert args_to_inputs(foo, (1,), {}) == {'a': 1, 'b': 2, 'c': 3} + ``` + """ + signature = inspect.signature(func) + bound_args = signature.bind(*args, **kwargs) + bound_args.apply_defaults() + return dict(bound_args.arguments) diff --git a/src/humanloop/utilities/prompt.py b/src/humanloop/utilities/prompt.py new file mode 100644 index 00000000..4e0f55f5 --- /dev/null +++ b/src/humanloop/utilities/prompt.py @@ -0,0 +1,88 @@ +import logging +from functools import wraps +from typing import Any, Callable, Mapping, Optional, Sequence + +from opentelemetry.sdk.trace import Span +from opentelemetry.trace import Tracer +from typing_extensions import Unpack + +from humanloop.utilities.helpers import args_to_inputs +from humanloop.utilities.types import DecoratorPromptKernelRequestParams +from humanloop.eval_utils import File +from humanloop.otel.constants import ( + HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_LOG_KEY, + HUMANLOOP_PATH_KEY, +) +from humanloop.otel.helpers import jsonify_if_not_string, write_to_opentelemetry_span + +logger = logging.getLogger("humanloop.sdk") + + +def prompt( + opentelemetry_tracer: Tracer, + path: Optional[str] = None, + # TODO: Template can be a list of objects? + **prompt_kernel: Unpack[DecoratorPromptKernelRequestParams], # type: ignore +): + def decorator(func: Callable): + @wraps(func) + def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: + span: Span + with opentelemetry_tracer.start_as_current_span("humanloop.prompt") as span: # type: ignore + span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) + span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "prompt") + + if prompt_kernel: + write_to_opentelemetry_span( + span=span, + key=f"{HUMANLOOP_FILE_KEY}.prompt", + value={ + **prompt_kernel, # type: ignore + "attributes": prompt_kernel.get("attributes") or None, # type: ignore + }, # type: ignore + ) + + # Call the decorated function + try: + output = func(*args, **kwargs) + output_stringified = jsonify_if_not_string( + func=func, + output=output, + ) + error = None + except Exception as e: + logger.error(f"Error calling {func.__name__}: {e}") + output = None + output_stringified = jsonify_if_not_string( + func=func, + output=output, + ) + error = str(e) + + prompt_log = { + "inputs": args_to_inputs(func, args, kwargs), + "output": output_stringified, + "error": error, + } + + write_to_opentelemetry_span( + span=span, + key=HUMANLOOP_LOG_KEY, + value=prompt_log, # type: ignore + ) + + # Return the output of the decorated function + return output + + wrapper.file = File( # type: ignore + path=path if path else func.__name__, + type="prompt", + version={**prompt_kernel}, # type: ignore + callable=wrapper, + ) + + return wrapper + + return decorator diff --git a/src/humanloop/utilities/tool.py b/src/humanloop/utilities/tool.py new file mode 100644 index 00000000..c17903d1 --- /dev/null +++ b/src/humanloop/utilities/tool.py @@ -0,0 +1,505 @@ +import builtins +import inspect +import logging +import sys +import textwrap +import typing +from dataclasses import dataclass +from functools import wraps +from inspect import Parameter +from typing import Any, Callable, Literal, Mapping, Optional, Sequence, TypedDict, Union + +from opentelemetry.trace import Tracer +from typing_extensions import Unpack + +from humanloop.utilities.helpers import args_to_inputs +from humanloop.eval_utils import File +from humanloop.otel.constants import ( + HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_LOG_KEY, + HUMANLOOP_PATH_KEY, +) +from humanloop.otel.helpers import jsonify_if_not_string, write_to_opentelemetry_span +from humanloop.requests.tool_function import ToolFunctionParams +from humanloop.requests.tool_kernel_request import ToolKernelRequestParams + +if sys.version_info >= (3, 10): + import types + +logger = logging.getLogger("humanloop.sdk") + + +def tool( + opentelemetry_tracer: Tracer, + path: Optional[str] = None, + **tool_kernel: Unpack[ToolKernelRequestParams], # type: ignore +): + def decorator(func: Callable): + enhanced_tool_kernel = _build_tool_kernel( + func=func, + attributes=tool_kernel.get("attributes"), + setup_values=tool_kernel.get("setup_values"), + strict=True, + ) + + # Mypy complains about adding attribute on function, but it's nice UX + func.json_schema = enhanced_tool_kernel["function"] # type: ignore + + @wraps(func) + def wrapper(*args, **kwargs): + with opentelemetry_tracer.start_as_current_span("humanloop.tool") as span: + # Write the Tool Kernel to the Span on HL_FILE_OT_KEY + span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) + span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "tool") + if enhanced_tool_kernel: + write_to_opentelemetry_span( + span=span, + key=f"{HUMANLOOP_FILE_KEY}.tool", + value=enhanced_tool_kernel, + ) + + # Call the decorated function + try: + output = func(*args, **kwargs) + output_stringified = jsonify_if_not_string( + func=func, + output=output, + ) + error = None + except Exception as e: + logger.error(f"Error calling {func.__name__}: {e}") + output = None + output_stringified = jsonify_if_not_string( + func=func, + output=output, + ) + error = str(e) + + # Populate known Tool Log attributes + tool_log = { + "inputs": args_to_inputs(func, args, kwargs), + "output": output_stringified, + "error": error, + } + + # Write the Tool Log to the Span on HL_LOG_OT_KEY + write_to_opentelemetry_span( + span=span, + key=HUMANLOOP_LOG_KEY, + value=tool_log, + ) + + # Return the output of the decorated function + return output + + wrapper.file = File( # type: ignore + path=path if path else func.__name__, + type="tool", + version=enhanced_tool_kernel, + callable=wrapper, + ) + + return wrapper + + return decorator + + +def _build_tool_kernel( + func: Callable, + attributes: Optional[dict[str, Optional[Any]]], + setup_values: Optional[dict[str, Optional[Any]]], + strict: bool, +) -> ToolKernelRequestParams: + """Build ToolKernelRequest object from decorated function.""" + try: + source_code = textwrap.dedent(inspect.getsource(func)) + except TypeError as e: + raise TypeError( + f"Cannot extract source code for function {func.__name__}. " + "Try decorating a plain function instead of a partial for example." + ) from e + # Remove decorator from source code by finding first 'def' + # This makes the source_code extraction idempotent whether + # the decorator is applied directly or used as a higher-order + # function + source_code = source_code[source_code.find("def") :] + kernel = ToolKernelRequestParams( + source_code=source_code, + function=_build_function_property( + func=func, + strict=strict, + ), + ) + if attributes: + kernel["attributes"] = attributes + if setup_values: + kernel["setup_values"] = setup_values + return kernel + + +def _build_function_property(func: Callable, strict: bool) -> ToolFunctionParams: + """Build `function` property inside ToolKernelRequest.""" + tool_name = func.__name__ + description = func.__doc__ + if description is None: + description = "" + return ToolFunctionParams( + name=tool_name, + description=description, + parameters=_build_function_parameters_property(func), # type: ignore + strict=strict, + ) + + +class _JSONSchemaFunctionParameters(TypedDict): + type: str + properties: dict[str, typing.Union[dict, list]] + required: list[str] + additionalProperties: Literal[False] + + +def _build_function_parameters_property(func) -> _JSONSchemaFunctionParameters: + """Build `function.parameters` property inside ToolKernelRequest.""" + properties: dict[str, Any] = {} + required: list[str] = [] + signature = inspect.signature(func) + + for parameter in signature.parameters.values(): + if parameter.kind in ( + inspect.Parameter.VAR_POSITIONAL, + inspect.Parameter.VAR_KEYWORD, + ): + raise ValueError(f"{func.__name__}: *args and **kwargs are not supported by the @tool decorator") + + for parameter in signature.parameters.values(): + try: + parameter_signature = _parse_annotation(parameter.annotation) + except ValueError as e: + raise ValueError(f"Error parsing signature of @tool annotated function {func.__name__}: {e}") from e + param_json_schema = _annotation_parse_to_json_schema(parameter_signature) + properties[parameter.name] = param_json_schema + if not _parameter_is_optional(parameter): + required.append(parameter.name) + + if len(properties) == 0 and len(required) == 0: + # Edge case: function with no parameters + return _JSONSchemaFunctionParameters( + type="object", + properties={}, + required=[], + additionalProperties=False, + ) + return _JSONSchemaFunctionParameters( + type="object", + # False positive, expected tuple[str] but got tuple[str, ...] + required=tuple(required), # type: ignore + properties=properties, + additionalProperties=False, + ) + + +if sys.version_info >= (3, 11): + _PRIMITIVE_TYPES = Union[ + str, + int, + float, + bool, + Parameter.empty, # type: ignore + Ellipsis, # type: ignore + ] +else: + # Ellipsis not supported as type before Python 3.11 + _PRIMITIVE_TYPES = Union[ + str, + int, + float, + bool, + Parameter.empty, # type: ignore + ] + + +@dataclass +class _ParsedAnnotation: + def no_type_hint(self) -> bool: + """Check if the annotation has no type hint. + + Examples: + str -> False + list -> True + list[str] -> False + """ + raise NotImplementedError + + +@dataclass +class _ParsedPrimitiveAnnotation(_ParsedAnnotation): + annotation: _PRIMITIVE_TYPES + + def no_type_hint(self) -> bool: + return self.annotation is Parameter.empty or self.annotation is Ellipsis + + +@dataclass +class _ParsedDictAnnotation(_ParsedAnnotation): + # Both are null if no type hint e.g. dict vs dict[str, int] + key_annotation: Optional[_ParsedAnnotation] + value_annotation: Optional[_ParsedAnnotation] + + def no_type_hint(self) -> bool: + return self.key_annotation is None and self.value_annotation is None + + +@dataclass +class _ParsedTupleAnnotation(_ParsedAnnotation): + # Null if no type hint e.g. tuple vs tuple[str, int] + annotation: Optional[list[_ParsedAnnotation]] + + def no_type_hint(self) -> bool: + return self.annotation is None + + +@dataclass +class _ParsedUnionAnnotation(_ParsedAnnotation): + annotation: list[_ParsedAnnotation] + + +@dataclass +class _ParsedListAnnotation(_ParsedAnnotation): + # Null if no type hint e.g. list vs list[str] + annotation: Optional[_ParsedAnnotation] + + +@dataclass +class _ParsedOptionalAnnotation(_ParsedAnnotation): + annotation: _ParsedAnnotation + + +def _parse_annotation(annotation: typing.Type) -> _ParsedAnnotation: + """Parse constituent parts of a potentially nested type hint. + + Custom types are not supported, only built-in types and typing module types. + + """ + origin = typing.get_origin(annotation) + if origin is None: + # Either not a nested type or no type hint + # Parameter.empty is used for parameters without type hints + # Ellipsis is interpreted as Any + if annotation not in ( + str, + int, + float, + bool, + Parameter.empty, + Ellipsis, + dict, + list, + tuple, + ): + raise ValueError(f"Unsupported type hint: {annotation}") + + # Check if it's a complex type with no inner type + if annotation == builtins.dict: + return _ParsedDictAnnotation( + value_annotation=None, + key_annotation=None, + ) + if annotation == builtins.list: + return _ParsedListAnnotation( + annotation=None, + ) + if annotation == builtins.tuple: + return _ParsedTupleAnnotation( + annotation=None, + ) + + # Is a primitive type + return _ParsedPrimitiveAnnotation( + annotation=annotation, + ) + + if origin is list: + inner_annotation = _parse_annotation(typing.get_args(annotation)[0]) + return _ParsedListAnnotation( + annotation=inner_annotation, + ) + + if origin is dict: + key_type = _parse_annotation(typing.get_args(annotation)[0]) + value_type = _parse_annotation(typing.get_args(annotation)[1]) + return _ParsedDictAnnotation( + key_annotation=key_type, + value_annotation=value_type, + ) + + if origin is tuple: + return _ParsedTupleAnnotation( + annotation=[_parse_annotation(arg) for arg in typing.get_args(annotation)], + ) + + if origin is typing.Union or (sys.version_info >= (3, 10) and origin is types.UnionType): + sub_types = typing.get_args(annotation) + if sub_types[-1] is type(None): + # type(None) in sub_types indicates Optional type + if len(sub_types) == 2: + # Union is an Optional type only + return _ParsedOptionalAnnotation( + annotation=_parse_annotation(sub_types[0]), + ) + # Union has sub_types and is Optional + return _ParsedOptionalAnnotation( + annotation=_ParsedUnionAnnotation( + annotation=[_parse_annotation(sub_type) for sub_type in sub_types[:-1]], + ) + ) + # Union type that is not Optional + return _ParsedUnionAnnotation( + annotation=[_parse_annotation(sub_type) for sub_type in sub_types], + ) + + raise ValueError(f"Unsupported origin: {origin}") + + +_JSON_SCHEMA_ANY = ["string", "integer", "number", "boolean", "object", "array", "null"] + + +def _annotation_parse_to_json_schema( + arg: _ParsedAnnotation, +) -> Mapping[str, Union[str, Mapping, Sequence]]: + """ + Convert parse result from _parse_annotation to JSON Schema for a parameter. + + The function recursively converts the nested type hints to a JSON Schema. + + Note that 'any' is not supported by JSON Schema, so we allow any type as a workaround. + """ + arg_type: Mapping[str, Union[str, Mapping, Sequence]] + + if isinstance(arg, _ParsedOptionalAnnotation): + is_optional = True + arg = arg.annotation + else: + is_optional = False + + if isinstance(arg, _ParsedUnionAnnotation): + arg_type = { + "anyOf": [_annotation_parse_to_json_schema(sub_type) for sub_type in arg.annotation], + } + + elif isinstance(arg, _ParsedTupleAnnotation): + if arg.annotation is None: + # tuple annotation with no type hints + # This is equivalent with a list, since the + # number of elements is not specified + arg_type = { + "type": "array", + "items": {"type": _JSON_SCHEMA_ANY}, + } + else: + arg_type = { + "type": "array", + "items": [_annotation_parse_to_json_schema(sub_type) for sub_type in arg.annotation], + } + + elif isinstance(arg, _ParsedListAnnotation): + if arg.annotation is None: + # list annotation with no type hints + if is_optional: + arg_type = { + "type": ["array", "null"], + "items": {"type": _JSON_SCHEMA_ANY}, + } + else: + arg_type = { + "type": "array", + "items": {"type": _JSON_SCHEMA_ANY}, + } + else: + arg_type = { + "type": "array", + "items": _annotation_parse_to_json_schema(arg.annotation), + } + + elif isinstance(arg, _ParsedDictAnnotation): + if arg.key_annotation is None and arg.value_annotation is None: + # dict annotation with no type hints + if is_optional: + arg_type = { + "type": ["object", "null"], + "properties": { + "key": {"type": _JSON_SCHEMA_ANY}, + "value": {"type": _JSON_SCHEMA_ANY}, + }, + } + else: + arg_type = { + "type": "object", + "properties": { + "key": {"type": _JSON_SCHEMA_ANY}, + "value": {"type": _JSON_SCHEMA_ANY}, + }, + } + else: + arg_type = { + "type": "object", + "properties": { + "key": _annotation_parse_to_json_schema(arg.key_annotation), # type: ignore + "value": _annotation_parse_to_json_schema(arg.value_annotation), # type: ignore + }, + } + + elif isinstance(arg, _ParsedPrimitiveAnnotation): + if arg.annotation is builtins.str: + arg_type = {"type": "string"} + if arg.annotation is builtins.int: + arg_type = {"type": "integer"} + if arg.annotation is builtins.float: + arg_type = {"type": "number"} + if arg.annotation is builtins.bool: + arg_type = {"type": "boolean"} + if arg.annotation is Parameter.empty or arg.annotation is Ellipsis: + # JSON Schema dropped support for 'any' type, we allow any type as a workaround + arg_type = {"type": _JSON_SCHEMA_ANY} + + else: + raise ValueError(f"Unsupported annotation type: {arg}") + + if is_optional: + if isinstance(arg, _ParsedUnionAnnotation): + for type_option in arg_type["anyOf"]: + if ( + isinstance(type_option["type"], list) # type: ignore + and "null" not in type_option["type"] # type: ignore + ): # type: ignore + type_option["type"] = [*type_option["type"], "null"] # type: ignore + elif not isinstance(type_option["type"], list): # type: ignore + type_option["type"] = [type_option["type"], "null"] # type: ignore + else: + if isinstance(arg_type["type"], list) and "null" not in arg_type["type"]: # type: ignore + arg_type = {**arg_type, "type": [*arg_type["type"], "null"]} # type: ignore + elif not isinstance(arg_type["type"], list): # type: ignore + arg_type = {**arg_type, "type": [arg_type["type"], "null"]} # type: ignore + + return arg_type + + +def _parameter_is_optional( + parameter: inspect.Parameter, +) -> bool: + """Check if tool parameter is mandatory. + + Examples: + Optional[T] -> True + T | None -> True + T -> False + """ + # Check if the parameter can be None, either via Optional[T] or T | None type hint + origin = typing.get_origin(parameter.annotation) + # sub_types refers to T inside the annotation + sub_types = typing.get_args(parameter.annotation) + return ( + (origin is typing.Union or (sys.version_info >= (3, 10) and origin is types.UnionType)) + and len(sub_types) > 0 + and sub_types[-1] is type(None) + ) diff --git a/src/humanloop/utilities/types.py b/src/humanloop/utilities/types.py new file mode 100644 index 00000000..f52f0178 --- /dev/null +++ b/src/humanloop/utilities/types.py @@ -0,0 +1,12 @@ +from typing_extensions import NotRequired + +from humanloop.requests.prompt_kernel_request import PromptKernelRequestParams + + +class DecoratorPromptKernelRequestParams(PromptKernelRequestParams): + """See :class:`PromptKernelRequestParams` for more information. + + Allows the `model` field to be optional for Prompt decorator. + """ + + model: NotRequired[str] # type: ignore From 58a1b957c6b9f4ade958220c56e19769d0e62b4d Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 6 Jan 2025 11:01:27 +0000 Subject: [PATCH 02/11] Fixing flow complete --- src/humanloop/client.py | 8 +- src/humanloop/decorators/flow.py | 106 ---- src/humanloop/decorators/helpers.py | 21 - src/humanloop/decorators/prompt.py | 94 ---- src/humanloop/decorators/tool.py | 519 ------------------ src/humanloop/decorators/types.py | 12 - src/humanloop/otel/__init__.py | 4 - src/humanloop/otel/exporter.py | 31 +- tests/decorators/__init__.py | 0 .../utilities}/__init__.py | 0 .../test_flow_decorator.py | 48 +- .../test_prompt_decorator.py | 2 +- .../test_tool_decorator.py | 2 +- 13 files changed, 18 insertions(+), 829 deletions(-) delete mode 100644 src/humanloop/decorators/flow.py delete mode 100644 src/humanloop/decorators/helpers.py delete mode 100644 src/humanloop/decorators/prompt.py delete mode 100644 src/humanloop/decorators/tool.py delete mode 100644 src/humanloop/decorators/types.py delete mode 100644 tests/decorators/__init__.py rename {src/humanloop/decorators => tests/utilities}/__init__.py (100%) rename tests/{decorators => utilities}/test_flow_decorator.py (77%) rename tests/{decorators => utilities}/test_prompt_decorator.py (99%) rename tests/{decorators => utilities}/test_tool_decorator.py (99%) diff --git a/src/humanloop/client.py b/src/humanloop/client.py index 2d582dbf..af2b1f38 100644 --- a/src/humanloop/client.py +++ b/src/humanloop/client.py @@ -10,16 +10,16 @@ from opentelemetry.trace import Tracer from humanloop.core.client_wrapper import SyncClientWrapper -from humanloop.decorators.types import DecoratorPromptKernelRequestParams +from humanloop.utilities.types import DecoratorPromptKernelRequestParams from humanloop.eval_utils.context import EVALUATION_CONTEXT_VARIABLE_NAME, EvaluationContext from humanloop.eval_utils import log_with_evaluation_context, run_eval from humanloop.eval_utils.types import Dataset, Evaluator, EvaluatorCheck, File from humanloop.base_client import AsyncBaseHumanloop, BaseHumanloop -from humanloop.decorators.flow import flow as flow_decorator_factory -from humanloop.decorators.prompt import prompt as prompt_decorator_factory -from humanloop.decorators.tool import tool as tool_decorator_factory +from humanloop.utilities.flow import flow as flow_decorator_factory +from humanloop.utilities.prompt import prompt as prompt_decorator_factory +from humanloop.utilities.tool import tool as tool_decorator_factory from humanloop.environment import HumanloopEnvironment from humanloop.evaluations.client import EvaluationsClient from humanloop.otel import instrument_provider diff --git a/src/humanloop/decorators/flow.py b/src/humanloop/decorators/flow.py deleted file mode 100644 index 51f9c731..00000000 --- a/src/humanloop/decorators/flow.py +++ /dev/null @@ -1,106 +0,0 @@ -import logging -from functools import wraps -from typing import Any, Callable, Mapping, Optional, Sequence - -from opentelemetry.sdk.trace import Span -from opentelemetry.trace import Tracer -from typing_extensions import Unpack - -from humanloop.decorators.helpers import args_to_inputs -from humanloop.eval_utils.types import File -from humanloop.otel import TRACE_FLOW_CONTEXT, FlowContext -from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_PATH_KEY -from humanloop.otel.helpers import generate_span_id, jsonify_if_not_string, write_to_opentelemetry_span -from humanloop.requests import FlowKernelRequestParams as FlowDict -from humanloop.requests.flow_kernel_request import FlowKernelRequestParams - -logger = logging.getLogger("humanloop.sdk") - - -def flow( - opentelemetry_tracer: Tracer, - path: Optional[str] = None, - **flow_kernel: Unpack[FlowKernelRequestParams], # type: ignore -): - flow_kernel["attributes"] = {k: v for k, v in flow_kernel.get("attributes", {}).items() if v is not None} - - def decorator(func: Callable): - @wraps(func) - def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: - span: Span - with opentelemetry_tracer.start_as_current_span(generate_span_id()) as span: # type: ignore - span_id = span.get_span_context().span_id - if span.parent: - span_parent_id = span.parent.span_id - parent_trace_metadata = TRACE_FLOW_CONTEXT.get(span_parent_id) - if parent_trace_metadata: - TRACE_FLOW_CONTEXT[span_id] = FlowContext( - trace_id=span_id, - trace_parent_id=span_parent_id, - is_flow_log=True, - ) - - else: - # The Flow Log is not nested under another Flow Log - # Set the trace_id to the current span_id - TRACE_FLOW_CONTEXT[span_id] = FlowContext( - trace_id=span_id, - trace_parent_id=None, - is_flow_log=True, - ) - - span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) - span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "flow") - if flow_kernel: - write_to_opentelemetry_span( - span=span, - key=f"{HUMANLOOP_FILE_KEY}.flow", - value=flow_kernel, # type: ignore - ) - - inputs = args_to_inputs(func, args, kwargs) - - # Call the decorated function - try: - output = func(*args, **kwargs) - output_stringified = jsonify_if_not_string( - func=func, - output=output, - ) - error = None - except Exception as e: - logger.error(f"Error calling {func.__name__}: {e}") - output = None - output_stringified = jsonify_if_not_string( - func=func, - output=None, - ) - error = str(e) - - flow_log = { - "inputs": inputs, - "output": output_stringified, - "error": error, - } - - # Write the Flow Log to the Span on HL_LOG_OT_KEY - if flow_log: - write_to_opentelemetry_span( - span=span, - key=HUMANLOOP_LOG_KEY, - value=flow_log, # type: ignore - ) - - # Return the output of the decorated function - return output - - wrapper.file = File( # type: ignore - path=path if path else func.__name__, - type="flow", - version=FlowDict(**flow_kernel), # type: ignore - callable=wrapper, - ) - - return wrapper - - return decorator diff --git a/src/humanloop/decorators/helpers.py b/src/humanloop/decorators/helpers.py deleted file mode 100644 index d501f800..00000000 --- a/src/humanloop/decorators/helpers.py +++ /dev/null @@ -1,21 +0,0 @@ -import inspect -from typing import Any, Callable - - -def args_to_inputs(func: Callable, args: tuple, kwargs: dict) -> dict[str, Any]: - """Maps arguments to their corresponding parameter names in the function signature. - - For example: - ```python - def foo(a, b=2, c=3): - pass - - assert args_to_inputs(foo, (1, 2), {}) == {'a': 1, 'b': 2, 'c': 3} - assert args_to_inputs(foo, (1,), {'b': 8}) == {'a': 1, 'b': 8, 'c': 3} - assert args_to_inputs(foo, (1,), {}) == {'a': 1, 'b': 2, 'c': 3} - ``` - """ - signature = inspect.signature(func) - bound_args = signature.bind(*args, **kwargs) - bound_args.apply_defaults() - return dict(bound_args.arguments) diff --git a/src/humanloop/decorators/prompt.py b/src/humanloop/decorators/prompt.py deleted file mode 100644 index c1f68a77..00000000 --- a/src/humanloop/decorators/prompt.py +++ /dev/null @@ -1,94 +0,0 @@ -import logging -from functools import wraps -from typing import Any, Callable, Mapping, Optional, Sequence - -from opentelemetry.sdk.trace import Span -from opentelemetry.trace import Tracer -from typing_extensions import Unpack - -from humanloop.decorators.helpers import args_to_inputs -from humanloop.decorators.types import DecoratorPromptKernelRequestParams -from humanloop.eval_utils import File -from humanloop.otel import TRACE_FLOW_CONTEXT, FlowContext -from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_PATH_KEY -from humanloop.otel.helpers import generate_span_id, jsonify_if_not_string, write_to_opentelemetry_span - -logger = logging.getLogger("humanloop.sdk") - - -def prompt( - opentelemetry_tracer: Tracer, - path: Optional[str] = None, - # TODO: Template can be a list of objects? - **prompt_kernel: Unpack[DecoratorPromptKernelRequestParams], # type: ignore -): - def decorator(func: Callable): - @wraps(func) - def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: - span: Span - with opentelemetry_tracer.start_as_current_span(generate_span_id()) as span: # type: ignore - span_id = span.get_span_context().span_id - if span.parent: - span_parent_id = span.parent.span_id - parent_trace_metadata = TRACE_FLOW_CONTEXT.get(span_parent_id, {}) - if parent_trace_metadata: - TRACE_FLOW_CONTEXT[span_id] = FlowContext( - trace_id=parent_trace_metadata["trace_id"], - trace_parent_id=span_parent_id, - is_flow_log=False, - ) - - span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) - span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "prompt") - - if prompt_kernel: - write_to_opentelemetry_span( - span=span, - key=f"{HUMANLOOP_FILE_KEY}.prompt", - value={ - **prompt_kernel, # type: ignore - "attributes": prompt_kernel.get("attributes") or None, # type: ignore - }, # type: ignore - ) - - # Call the decorated function - try: - output = func(*args, **kwargs) - output_stringified = jsonify_if_not_string( - func=func, - output=output, - ) - error = None - except Exception as e: - logger.error(f"Error calling {func.__name__}: {e}") - output = None - output_stringified = jsonify_if_not_string( - func=func, - output=output, - ) - error = str(e) - - prompt_log = { - "inputs": args_to_inputs(func, args, kwargs), - "output": output_stringified, - "error": error, - } - write_to_opentelemetry_span( - span=span, - key=HUMANLOOP_LOG_KEY, - value=prompt_log, # type: ignore - ) - - # Return the output of the decorated function - return output - - wrapper.file = File( # type: ignore - path=path if path else func.__name__, - type="prompt", - version={**prompt_kernel}, # type: ignore - callable=wrapper, - ) - - return wrapper - - return decorator diff --git a/src/humanloop/decorators/tool.py b/src/humanloop/decorators/tool.py deleted file mode 100644 index 2752d017..00000000 --- a/src/humanloop/decorators/tool.py +++ /dev/null @@ -1,519 +0,0 @@ -import builtins -import inspect -import logging -import sys -import textwrap -import typing -from dataclasses import dataclass -from functools import wraps -from inspect import Parameter -from typing import Any, Callable, Literal, Mapping, Optional, Sequence, TypedDict, Union - -from opentelemetry.trace import Tracer -from typing_extensions import Unpack - -from humanloop.decorators.helpers import args_to_inputs -from humanloop.eval_utils import File -from humanloop.otel import TRACE_FLOW_CONTEXT, FlowContext -from humanloop.otel.constants import ( - HUMANLOOP_FILE_KEY, - HUMANLOOP_FILE_TYPE_KEY, - HUMANLOOP_LOG_KEY, - HUMANLOOP_PATH_KEY, -) -from humanloop.otel.helpers import generate_span_id, jsonify_if_not_string, write_to_opentelemetry_span -from humanloop.requests.tool_function import ToolFunctionParams -from humanloop.requests.tool_kernel_request import ToolKernelRequestParams - -if sys.version_info >= (3, 10): - import types - -logger = logging.getLogger("humanloop.sdk") - - -def tool( - opentelemetry_tracer: Tracer, - path: Optional[str] = None, - **tool_kernel: Unpack[ToolKernelRequestParams], # type: ignore -): - def decorator(func: Callable): - enhanced_tool_kernel = _build_tool_kernel( - func=func, - attributes=tool_kernel.get("attributes"), - setup_values=tool_kernel.get("setup_values"), - strict=True, - ) - - # Mypy complains about adding attribute on function, but it's nice UX - func.json_schema = enhanced_tool_kernel["function"] # type: ignore - - @wraps(func) - def wrapper(*args, **kwargs): - with opentelemetry_tracer.start_as_current_span(generate_span_id()) as span: - span_id = span.get_span_context().span_id - if span.parent: - span_parent_id = span.parent.span_id - else: - span_parent_id = None - parent_trace_metadata = TRACE_FLOW_CONTEXT.get(span_parent_id) - if parent_trace_metadata: - TRACE_FLOW_CONTEXT[span_id] = FlowContext( - span_id=span_id, - trace_parent_id=span_parent_id, - is_flow_log=False, - ) - - # Write the Tool Kernel to the Span on HL_FILE_OT_KEY - span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) - span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "tool") - if enhanced_tool_kernel: - write_to_opentelemetry_span( - span=span, - key=f"{HUMANLOOP_FILE_KEY}.tool", - value=enhanced_tool_kernel, - ) - - # Call the decorated function - try: - output = func(*args, **kwargs) - output_stringified = jsonify_if_not_string( - func=func, - output=output, - ) - error = None - except Exception as e: - logger.error(f"Error calling {func.__name__}: {e}") - output = None - output_stringified = jsonify_if_not_string( - func=func, - output=output, - ) - error = str(e) - - # Populate known Tool Log attributes - tool_log = { - "inputs": args_to_inputs(func, args, kwargs), - "output": output_stringified, - "error": error, - } - - # Write the Tool Log to the Span on HL_LOG_OT_KEY - write_to_opentelemetry_span( - span=span, - key=HUMANLOOP_LOG_KEY, - value=tool_log, - ) - - # Return the output of the decorated function - return output - - wrapper.file = File( # type: ignore - path=path if path else func.__name__, - type="tool", - version=enhanced_tool_kernel, - callable=wrapper, - ) - - return wrapper - - return decorator - - -def _build_tool_kernel( - func: Callable, - attributes: Optional[dict[str, Optional[Any]]], - setup_values: Optional[dict[str, Optional[Any]]], - strict: bool, -) -> ToolKernelRequestParams: - """Build ToolKernelRequest object from decorated function.""" - try: - source_code = textwrap.dedent(inspect.getsource(func)) - except TypeError as e: - raise TypeError( - f"Cannot extract source code for function {func.__name__}. " - "Try decorating a plain function instead of a partial for example." - ) from e - # Remove decorator from source code by finding first 'def' - # This makes the source_code extraction idempotent whether - # the decorator is applied directly or used as a higher-order - # function - source_code = source_code[source_code.find("def") :] - kernel = ToolKernelRequestParams( - source_code=source_code, - function=_build_function_property( - func=func, - strict=strict, - ), - ) - if attributes: - kernel["attributes"] = attributes - if setup_values: - kernel["setup_values"] = setup_values - return kernel - - -def _build_function_property(func: Callable, strict: bool) -> ToolFunctionParams: - """Build `function` property inside ToolKernelRequest.""" - tool_name = func.__name__ - description = func.__doc__ - if description is None: - description = "" - return ToolFunctionParams( - name=tool_name, - description=description, - parameters=_build_function_parameters_property(func), # type: ignore - strict=strict, - ) - - -class _JSONSchemaFunctionParameters(TypedDict): - type: str - properties: dict[str, typing.Union[dict, list]] - required: list[str] - additionalProperties: Literal[False] - - -def _build_function_parameters_property(func) -> _JSONSchemaFunctionParameters: - """Build `function.parameters` property inside ToolKernelRequest.""" - properties: dict[str, Any] = {} - required: list[str] = [] - signature = inspect.signature(func) - - for parameter in signature.parameters.values(): - if parameter.kind in ( - inspect.Parameter.VAR_POSITIONAL, - inspect.Parameter.VAR_KEYWORD, - ): - raise ValueError(f"{func.__name__}: *args and **kwargs are not supported by the @tool decorator") - - for parameter in signature.parameters.values(): - try: - parameter_signature = _parse_annotation(parameter.annotation) - except ValueError as e: - raise ValueError(f"Error parsing signature of @tool annotated function {func.__name__}: {e}") from e - param_json_schema = _annotation_parse_to_json_schema(parameter_signature) - properties[parameter.name] = param_json_schema - if not _parameter_is_optional(parameter): - required.append(parameter.name) - - if len(properties) == 0 and len(required) == 0: - # Edge case: function with no parameters - return _JSONSchemaFunctionParameters( - type="object", - properties={}, - required=[], - additionalProperties=False, - ) - return _JSONSchemaFunctionParameters( - type="object", - # False positive, expected tuple[str] but got tuple[str, ...] - required=tuple(required), # type: ignore - properties=properties, - additionalProperties=False, - ) - - -if sys.version_info >= (3, 11): - _PRIMITIVE_TYPES = Union[ - str, - int, - float, - bool, - Parameter.empty, # type: ignore - Ellipsis, # type: ignore - ] -else: - # Ellipsis not supported as type before Python 3.11 - _PRIMITIVE_TYPES = Union[ - str, - int, - float, - bool, - Parameter.empty, # type: ignore - ] - - -@dataclass -class _ParsedAnnotation: - def no_type_hint(self) -> bool: - """Check if the annotation has no type hint. - - Examples: - str -> False - list -> True - list[str] -> False - """ - raise NotImplementedError - - -@dataclass -class _ParsedPrimitiveAnnotation(_ParsedAnnotation): - annotation: _PRIMITIVE_TYPES - - def no_type_hint(self) -> bool: - return self.annotation is Parameter.empty or self.annotation is Ellipsis - - -@dataclass -class _ParsedDictAnnotation(_ParsedAnnotation): - # Both are null if no type hint e.g. dict vs dict[str, int] - key_annotation: Optional[_ParsedAnnotation] - value_annotation: Optional[_ParsedAnnotation] - - def no_type_hint(self) -> bool: - return self.key_annotation is None and self.value_annotation is None - - -@dataclass -class _ParsedTupleAnnotation(_ParsedAnnotation): - # Null if no type hint e.g. tuple vs tuple[str, int] - annotation: Optional[list[_ParsedAnnotation]] - - def no_type_hint(self) -> bool: - return self.annotation is None - - -@dataclass -class _ParsedUnionAnnotation(_ParsedAnnotation): - annotation: list[_ParsedAnnotation] - - -@dataclass -class _ParsedListAnnotation(_ParsedAnnotation): - # Null if no type hint e.g. list vs list[str] - annotation: Optional[_ParsedAnnotation] - - -@dataclass -class _ParsedOptionalAnnotation(_ParsedAnnotation): - annotation: _ParsedAnnotation - - -def _parse_annotation(annotation: typing.Type) -> _ParsedAnnotation: - """Parse constituent parts of a potentially nested type hint. - - Custom types are not supported, only built-in types and typing module types. - - """ - origin = typing.get_origin(annotation) - if origin is None: - # Either not a nested type or no type hint - # Parameter.empty is used for parameters without type hints - # Ellipsis is interpreted as Any - if annotation not in ( - str, - int, - float, - bool, - Parameter.empty, - Ellipsis, - dict, - list, - tuple, - ): - raise ValueError(f"Unsupported type hint: {annotation}") - - # Check if it's a complex type with no inner type - if annotation == builtins.dict: - return _ParsedDictAnnotation( - value_annotation=None, - key_annotation=None, - ) - if annotation == builtins.list: - return _ParsedListAnnotation( - annotation=None, - ) - if annotation == builtins.tuple: - return _ParsedTupleAnnotation( - annotation=None, - ) - - # Is a primitive type - return _ParsedPrimitiveAnnotation( - annotation=annotation, - ) - - if origin is list: - inner_annotation = _parse_annotation(typing.get_args(annotation)[0]) - return _ParsedListAnnotation( - annotation=inner_annotation, - ) - - if origin is dict: - key_type = _parse_annotation(typing.get_args(annotation)[0]) - value_type = _parse_annotation(typing.get_args(annotation)[1]) - return _ParsedDictAnnotation( - key_annotation=key_type, - value_annotation=value_type, - ) - - if origin is tuple: - return _ParsedTupleAnnotation( - annotation=[_parse_annotation(arg) for arg in typing.get_args(annotation)], - ) - - if origin is typing.Union or (sys.version_info >= (3, 10) and origin is types.UnionType): - sub_types = typing.get_args(annotation) - if sub_types[-1] is type(None): - # type(None) in sub_types indicates Optional type - if len(sub_types) == 2: - # Union is an Optional type only - return _ParsedOptionalAnnotation( - annotation=_parse_annotation(sub_types[0]), - ) - # Union has sub_types and is Optional - return _ParsedOptionalAnnotation( - annotation=_ParsedUnionAnnotation( - annotation=[_parse_annotation(sub_type) for sub_type in sub_types[:-1]], - ) - ) - # Union type that is not Optional - return _ParsedUnionAnnotation( - annotation=[_parse_annotation(sub_type) for sub_type in sub_types], - ) - - raise ValueError(f"Unsupported origin: {origin}") - - -_JSON_SCHEMA_ANY = ["string", "integer", "number", "boolean", "object", "array", "null"] - - -def _annotation_parse_to_json_schema( - arg: _ParsedAnnotation, -) -> Mapping[str, Union[str, Mapping, Sequence]]: - """ - Convert parse result from _parse_annotation to JSON Schema for a parameter. - - The function recursively converts the nested type hints to a JSON Schema. - - Note that 'any' is not supported by JSON Schema, so we allow any type as a workaround. - """ - arg_type: Mapping[str, Union[str, Mapping, Sequence]] - - if isinstance(arg, _ParsedOptionalAnnotation): - is_optional = True - arg = arg.annotation - else: - is_optional = False - - if isinstance(arg, _ParsedUnionAnnotation): - arg_type = { - "anyOf": [_annotation_parse_to_json_schema(sub_type) for sub_type in arg.annotation], - } - - elif isinstance(arg, _ParsedTupleAnnotation): - if arg.annotation is None: - # tuple annotation with no type hints - # This is equivalent with a list, since the - # number of elements is not specified - arg_type = { - "type": "array", - "items": {"type": _JSON_SCHEMA_ANY}, - } - else: - arg_type = { - "type": "array", - "items": [_annotation_parse_to_json_schema(sub_type) for sub_type in arg.annotation], - } - - elif isinstance(arg, _ParsedListAnnotation): - if arg.annotation is None: - # list annotation with no type hints - if is_optional: - arg_type = { - "type": ["array", "null"], - "items": {"type": _JSON_SCHEMA_ANY}, - } - else: - arg_type = { - "type": "array", - "items": {"type": _JSON_SCHEMA_ANY}, - } - else: - arg_type = { - "type": "array", - "items": _annotation_parse_to_json_schema(arg.annotation), - } - - elif isinstance(arg, _ParsedDictAnnotation): - if arg.key_annotation is None and arg.value_annotation is None: - # dict annotation with no type hints - if is_optional: - arg_type = { - "type": ["object", "null"], - "properties": { - "key": {"type": _JSON_SCHEMA_ANY}, - "value": {"type": _JSON_SCHEMA_ANY}, - }, - } - else: - arg_type = { - "type": "object", - "properties": { - "key": {"type": _JSON_SCHEMA_ANY}, - "value": {"type": _JSON_SCHEMA_ANY}, - }, - } - else: - arg_type = { - "type": "object", - "properties": { - "key": _annotation_parse_to_json_schema(arg.key_annotation), # type: ignore - "value": _annotation_parse_to_json_schema(arg.value_annotation), # type: ignore - }, - } - - elif isinstance(arg, _ParsedPrimitiveAnnotation): - if arg.annotation is builtins.str: - arg_type = {"type": "string"} - if arg.annotation is builtins.int: - arg_type = {"type": "integer"} - if arg.annotation is builtins.float: - arg_type = {"type": "number"} - if arg.annotation is builtins.bool: - arg_type = {"type": "boolean"} - if arg.annotation is Parameter.empty or arg.annotation is Ellipsis: - # JSON Schema dropped support for 'any' type, we allow any type as a workaround - arg_type = {"type": _JSON_SCHEMA_ANY} - - else: - raise ValueError(f"Unsupported annotation type: {arg}") - - if is_optional: - if isinstance(arg, _ParsedUnionAnnotation): - for type_option in arg_type["anyOf"]: - if ( - isinstance(type_option["type"], list) # type: ignore - and "null" not in type_option["type"] # type: ignore - ): # type: ignore - type_option["type"] = [*type_option["type"], "null"] # type: ignore - elif not isinstance(type_option["type"], list): # type: ignore - type_option["type"] = [type_option["type"], "null"] # type: ignore - else: - if isinstance(arg_type["type"], list) and "null" not in arg_type["type"]: # type: ignore - arg_type = {**arg_type, "type": [*arg_type["type"], "null"]} # type: ignore - elif not isinstance(arg_type["type"], list): # type: ignore - arg_type = {**arg_type, "type": [arg_type["type"], "null"]} # type: ignore - - return arg_type - - -def _parameter_is_optional( - parameter: inspect.Parameter, -) -> bool: - """Check if tool parameter is mandatory. - - Examples: - Optional[T] -> True - T | None -> True - T -> False - """ - # Check if the parameter can be None, either via Optional[T] or T | None type hint - origin = typing.get_origin(parameter.annotation) - # sub_types refers to T inside the annotation - sub_types = typing.get_args(parameter.annotation) - return ( - (origin is typing.Union or (sys.version_info >= (3, 10) and origin is types.UnionType)) - and len(sub_types) > 0 - and sub_types[-1] is type(None) - ) diff --git a/src/humanloop/decorators/types.py b/src/humanloop/decorators/types.py deleted file mode 100644 index f52f0178..00000000 --- a/src/humanloop/decorators/types.py +++ /dev/null @@ -1,12 +0,0 @@ -from typing_extensions import NotRequired - -from humanloop.requests.prompt_kernel_request import PromptKernelRequestParams - - -class DecoratorPromptKernelRequestParams(PromptKernelRequestParams): - """See :class:`PromptKernelRequestParams` for more information. - - Allows the `model` field to be optional for Prompt decorator. - """ - - model: NotRequired[str] # type: ignore diff --git a/src/humanloop/otel/__init__.py b/src/humanloop/otel/__init__.py index 82fd0c68..3442161e 100644 --- a/src/humanloop/otel/__init__.py +++ b/src/humanloop/otel/__init__.py @@ -1,7 +1,3 @@ -from typing import Optional, TypedDict - -from opentelemetry.sdk.trace import TracerProvider -from typing_extensions import NotRequired from opentelemetry.sdk.trace import TracerProvider from humanloop.otel.helpers import module_is_installed diff --git a/src/humanloop/otel/exporter.py b/src/humanloop/otel/exporter.py index 3417fe1b..730f7a16 100644 --- a/src/humanloop/otel/exporter.py +++ b/src/humanloop/otel/exporter.py @@ -14,7 +14,6 @@ from humanloop.core import ApiError as HumanloopApiError from humanloop.eval_utils.context import EVALUATION_CONTEXT_VARIABLE_NAME, EvaluationContext -from humanloop.otel import TRACE_FLOW_CONTEXT from humanloop.otel.constants import ( HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, @@ -176,36 +175,20 @@ def _do_work(self): self._client.evaluation_context_variable.set(evaluation_context) except EmptyQueue: continue - trace_metadata = TRACE_FLOW_CONTEXT.get(span_to_export.get_span_context().span_id) - if trace_metadata is None: + if span_to_export.parent is None: # Span is not part of a Flow Log self._export_span_dispatch(span_to_export) logger.debug( - "_do_work on Thread %s: Dispatched span %s with FlowContext %s which is not part of a Flow", + "_do_work on Thread %s: Starting to upload span %s", threading.get_ident(), - span_to_export.attributes, - trace_metadata, + span_to_export, ) - elif trace_metadata["trace_parent_id"] is None: - # Span is the head of a Flow Trace - self._export_span_dispatch(span_to_export) - logger.debug( - "Dispatched span %s which is a Flow Log with FlowContext %s", - span_to_export.attributes, - trace_metadata, - ) - elif trace_metadata["trace_parent_id"] in self._span_id_to_uploaded_log_id: + elif span_to_export.parent.span_id in self._span_id_to_uploaded_log_id: # Span is part of a Flow and its parent has been uploaded self._export_span_dispatch(span_to_export) - logger.debug( - "_do_work on Thread %s: Dispatched span %s after its parent %s with FlowContext %s", - threading.get_ident(), - span_to_export.attributes, - trace_metadata["trace_parent_id"], - trace_metadata, - ) + logger.debug("_do_work on Thread %s: Starting to upload span %s", threading.get_ident(), span_to_export) else: - # Requeue the Span to be uploaded later + # Requeue the Span and upload after its parent self._upload_queue.put((span_to_export, evaluation_context)) self._upload_queue.task_done() @@ -214,6 +197,7 @@ def _complete_flow_log(self, span_id: int) -> None: if span_id in flow_children_span_ids: flow_children_span_ids.remove(span_id) if len(flow_children_span_ids) == 0: + # All logs in the Trace have been uploaded, mark the Flow Log as complete flow_log_id = self._span_id_to_uploaded_log_id[flow_log_span_id] self._client.flows.update_log(log_id=flow_log_id, trace_status="complete") break @@ -346,7 +330,6 @@ def _export_flow(self, span: ReadableSpan) -> None: **log_object, trace_parent_id=trace_parent_id, ) - self._flow_logs_to_complete.append(log_response.id) self._span_id_to_uploaded_log_id[span.get_span_context().span_id] = log_response.id except HumanloopApiError as e: logger.error(str(e)) diff --git a/tests/decorators/__init__.py b/tests/decorators/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/humanloop/decorators/__init__.py b/tests/utilities/__init__.py similarity index 100% rename from src/humanloop/decorators/__init__.py rename to tests/utilities/__init__.py diff --git a/tests/decorators/test_flow_decorator.py b/tests/utilities/test_flow_decorator.py similarity index 77% rename from tests/decorators/test_flow_decorator.py rename to tests/utilities/test_flow_decorator.py index 09a769f6..95e708a0 100644 --- a/tests/decorators/test_flow_decorator.py +++ b/tests/utilities/test_flow_decorator.py @@ -5,10 +5,9 @@ from unittest.mock import patch import pytest -from humanloop.decorators.flow import flow -from humanloop.decorators.prompt import prompt -from humanloop.decorators.tool import tool -from humanloop.otel import TRACE_FLOW_CONTEXT +from humanloop.utilities.flow import flow +from humanloop.utilities.prompt import prompt +from humanloop.utilities.tool import tool from humanloop.otel.constants import HUMANLOOP_FILE_KEY from humanloop.otel.exporter import HumanloopSpanExporter from humanloop.otel.helpers import read_from_opentelemetry_span @@ -99,9 +98,6 @@ def test_decorators_without_flow( span=spans[2], key=HUMANLOOP_FILE_KEY, )["prompt"] - for span in spans: - # THEN no metadata related to trace is present on either of them - assert TRACE_FLOW_CONTEXT.get(span.get_span_context().span_id) is None def test_decorators_with_flow_decorator( @@ -135,18 +131,7 @@ def test_decorators_with_flow_decorator( # THEN the span are returned bottom to top assert read_from_opentelemetry_span(span=spans[1], key=HUMANLOOP_FILE_KEY)["tool"] assert read_from_opentelemetry_span(span=spans[2], key=HUMANLOOP_FILE_KEY)["prompt"] - # assert read_from_opentelemetry_span(span=spans[3], key=HL_FILE_OT_KEY)["flow"] - assert (tool_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[1].get_span_context().span_id)) - assert (prompt_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[2].get_span_context().span_id)) - assert (flow_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[3].get_span_context().span_id)) - # THEN Tool span is a child of Prompt span - assert tool_trace_metadata["trace_parent_id"] == spans[2].context.span_id - assert tool_trace_metadata["is_flow_log"] is False - assert prompt_trace_metadata["trace_parent_id"] == spans[3].context.span_id - # THEN Prompt span is a child of Flow span - assert prompt_trace_metadata["is_flow_log"] is False - assert flow_trace_metadata["is_flow_log"] - assert flow_trace_metadata["trace_id"] == spans[3].context.span_id + assert read_from_opentelemetry_span(span=spans[3], key=HUMANLOOP_FILE_KEY)["flow"] def test_flow_decorator_flow_in_flow( @@ -175,25 +160,6 @@ def test_flow_decorator_flow_in_flow( with pytest.raises(KeyError): read_from_opentelemetry_span(span=spans[4], key=HUMANLOOP_FILE_KEY)["flow"] != {} - assert (tool_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[1].get_span_context().span_id)) - assert (prompt_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[2].get_span_context().span_id)) - assert (nested_flow_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[3].get_span_context().span_id)) - assert (flow_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[4].get_span_context().span_id)) - # THEN the parent of the Tool Log is the Prompt Log - assert tool_trace_metadata["trace_parent_id"] == spans[2].context.span_id - assert tool_trace_metadata["is_flow_log"] is False - # THEN the parent of the Prompt Log is the Flow Log - assert prompt_trace_metadata["trace_parent_id"] == spans[3].context.span_id - assert prompt_trace_metadata["is_flow_log"] is False - # THEN the nested Flow Log creates a new trace - assert nested_flow_trace_metadata["trace_id"] == spans[3].context.span_id - assert nested_flow_trace_metadata["is_flow_log"] - # THEN the parent of the nested Flow Log is the upper Flow Log - assert nested_flow_trace_metadata["trace_parent_id"] == spans[4].context.span_id - # THEN the parent Flow Log correctly points to itself - assert flow_trace_metadata["trace_id"] == spans[4].context.span_id - assert flow_trace_metadata["is_flow_log"] - def test_flow_decorator_with_hl_exporter( call_llm_messages: list[dict], @@ -291,8 +257,4 @@ def test_flow_decorator_hl_exporter_flow_inside_flow( # THEN the second to last uploaded span is the nested Flow flow_span = mock_export_method.call_args_list[4][0][0][0] nested_flow_span = mock_export_method.call_args_list[3][0][0][0] - assert (last_span_flow_metadata := TRACE_FLOW_CONTEXT.get(flow_span.get_span_context().span_id)) - assert (flow_span_flow_metadata := TRACE_FLOW_CONTEXT.get(nested_flow_span.get_span_context().span_id)) - assert flow_span_flow_metadata["trace_parent_id"] == flow_span.context.span_id - assert last_span_flow_metadata["is_flow_log"] - assert flow_span_flow_metadata["is_flow_log"] + assert nested_flow_span.parent.span_id == flow_span.context.span_id diff --git a/tests/decorators/test_prompt_decorator.py b/tests/utilities/test_prompt_decorator.py similarity index 99% rename from tests/decorators/test_prompt_decorator.py rename to tests/utilities/test_prompt_decorator.py index 23c4fb64..84705a67 100644 --- a/tests/decorators/test_prompt_decorator.py +++ b/tests/utilities/test_prompt_decorator.py @@ -11,7 +11,7 @@ from dotenv import load_dotenv from groq import Groq from groq import NotFoundError as GroqNotFoundError -from humanloop.decorators.prompt import prompt +from humanloop.utilities.prompt import prompt from humanloop.otel.constants import HUMANLOOP_FILE_KEY from humanloop.otel.helpers import is_humanloop_span, read_from_opentelemetry_span from humanloop.types.model_providers import ModelProviders diff --git a/tests/decorators/test_tool_decorator.py b/tests/utilities/test_tool_decorator.py similarity index 99% rename from tests/decorators/test_tool_decorator.py rename to tests/utilities/test_tool_decorator.py index 2f4db209..29cc8fe1 100644 --- a/tests/decorators/test_tool_decorator.py +++ b/tests/utilities/test_tool_decorator.py @@ -2,7 +2,7 @@ from typing import Any, Optional, TypedDict, Union import pytest -from humanloop.decorators.tool import tool +from humanloop.utilities.tool import tool from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_LOG_KEY from humanloop.otel.helpers import read_from_opentelemetry_span from jsonschema.protocols import Validator From a44a9870c404c49a6ff5b38bb6a7e21320045fc6 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 6 Jan 2025 14:27:17 +0000 Subject: [PATCH 03/11] Bug fixing --- src/humanloop/otel/exporter.py | 39 +++++++++--- src/humanloop/otel/processor.py | 106 ++++++++++++++++++++++++++------ 2 files changed, 115 insertions(+), 30 deletions(-) diff --git a/src/humanloop/otel/exporter.py b/src/humanloop/otel/exporter.py index 730f7a16..60ba41bb 100644 --- a/src/humanloop/otel/exporter.py +++ b/src/humanloop/otel/exporter.py @@ -108,8 +108,9 @@ def is_evaluated_file( ), ) logger.debug( - "Span %s with EvaluationContext %s added to upload queue", - span.attributes, + "[HumanloopSpanExporter] Span %s %s with EvaluationContext %s added to upload queue", + span.context.span_id, + span.name, evaluation_context_copy, ) # Reset the EvaluationContext so run eval does not @@ -119,7 +120,7 @@ def is_evaluated_file( evaluation_context, ): logger.debug( - "EvaluationContext %s marked as exhausted for Log in Span %s", + "[HumanloopSpanExporter] EvaluationContext %s marked as exhausted for Log in Span %s", evaluation_context, spans[0].attributes, ) @@ -127,16 +128,16 @@ def is_evaluated_file( self._client.evaluation_context_variable.set(None) return SpanExportResult.SUCCESS else: - logger.warning("HumanloopSpanExporter is shutting down, not accepting new spans") + logger.warning("[HumanloopSpanExporter] Shutting down, not accepting new spans") return SpanExportResult.FAILURE def shutdown(self) -> None: self._shutdown = True for thread in self._threads: thread.join() - logger.debug("Exporter Thread %s joined", thread.ident) + logger.debug("[HumanloopSpanExporter] Exporter Thread %s joined", thread.ident) - def force_flush(self, timeout_millis: int = 3000) -> bool: + def force_flush(self, timeout_millis: int = 10000) -> bool: self._shutdown = True for thread in self._threads: thread.join(timeout=timeout_millis) @@ -179,14 +180,20 @@ def _do_work(self): # Span is not part of a Flow Log self._export_span_dispatch(span_to_export) logger.debug( - "_do_work on Thread %s: Starting to upload span %s", + "[HumanloopSpanExporter] _do_work on Thread %s: Dispatching span %s %s", threading.get_ident(), - span_to_export, + span_to_export.context.span_id, + span_to_export.name, ) elif span_to_export.parent.span_id in self._span_id_to_uploaded_log_id: # Span is part of a Flow and its parent has been uploaded self._export_span_dispatch(span_to_export) - logger.debug("_do_work on Thread %s: Starting to upload span %s", threading.get_ident(), span_to_export) + logger.debug( + "[HumanloopSpanExporter] _do_work on Thread %s: Dispatching span %s %s", + threading.get_ident(), + span_to_export.context.span_id, + span_to_export.name, + ) else: # Requeue the Span and upload after its parent self._upload_queue.put((span_to_export, evaluation_context)) @@ -200,7 +207,7 @@ def _complete_flow_log(self, span_id: int) -> None: # All logs in the Trace have been uploaded, mark the Flow Log as complete flow_log_id = self._span_id_to_uploaded_log_id[flow_log_span_id] self._client.flows.update_log(log_id=flow_log_id, trace_status="complete") - break + break def _export_span_dispatch(self, span: ReadableSpan) -> None: hl_file = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) @@ -208,8 +215,20 @@ def _export_span_dispatch(self, span: ReadableSpan) -> None: parent_span_id = span.parent.span_id if span.parent else None while parent_span_id and self._span_id_to_uploaded_log_id.get(parent_span_id) is None: + logger.debug( + "[HumanloopSpanExporter] Span %s %s waiting for parent %s to be uploaded", + span.context.span_id, + span.name, + parent_span_id, + ) time.sleep(0.1) + logger.debug( + "[HumanloopSpanExporter] Exporting span %s with file type %s", + span, + file_type, + ) + if file_type == "prompt": export_func = self._export_prompt elif file_type == "tool": diff --git a/src/humanloop/otel/processor.py b/src/humanloop/otel/processor.py index b42ec40b..f775768e 100644 --- a/src/humanloop/otel/processor.py +++ b/src/humanloop/otel/processor.py @@ -1,6 +1,8 @@ +from concurrent.futures import ThreadPoolExecutor import logging from collections import defaultdict -from typing import Any +import time +from typing import Any, TypedDict from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter @@ -23,6 +25,11 @@ logger = logging.getLogger("humanloop.sdk") +class CompletableSpan(TypedDict): + span: ReadableSpan + complete: bool + + class HumanloopSpanProcessor(SimpleSpanProcessor): """Enrich Humanloop spans with data from their children spans. @@ -42,43 +49,93 @@ class HumanloopSpanProcessor(SimpleSpanProcessor): def __init__(self, exporter: SpanExporter) -> None: super().__init__(exporter) - # Span parent to Span children map - self._children: dict[int, list] = defaultdict(list) + # span parent to span children map + self._children: dict[int, list[CompletableSpan]] = defaultdict(list) + # List of all span IDs that are contained in a Flow trace + # They are passed to the Exporter as a span attribute + # so the Exporter knows when to complete a trace self._prerequisites: dict[int, list[int]] = {} + self._executor = ThreadPoolExecutor(max_workers=4) + + def shutdown(self): + self._executor.shutdown() + return super().shutdown() def on_start(self, span, parent_context=None): span_id = span.context.span_id + parent_span_id = span.parent.span_id if span.parent else None if span.name == "humanloop.flow": self._prerequisites[span_id] = [] - if span.parent and is_humanloop_span(span): - parent_span_id = span.parent.span_id + if parent_span_id and is_humanloop_span(span): for trace_head, all_trace_nodes in self._prerequisites.items(): if parent_span_id == trace_head or parent_span_id in all_trace_nodes: all_trace_nodes.append(span_id) + break + # Handle stream case: when Prompt instrumented function calls a provider with streaming: true + # The instrumentor span will end only when the ChunksResponse is consumed, which can happen + # after the span created by the Prompt utility finishes. To handle this, we register all instrumentor + # spans belonging to a Humanloop span, and their parent will wait for them to complete in onEnd before + # exporting the Humanloop span. + if parent_span_id and _is_instrumentor_span(span): + if parent_span_id not in self._children: + self._children[parent_span_id] = [] + self._children[parent_span_id].append( + { + "span": span, + "complete": False, + } + ) def on_end(self, span: ReadableSpan) -> None: if is_humanloop_span(span=span): - _process_span_dispatch(span, self._children[span.context.span_id]) - # Release the reference to the Spans as they've already - # been sent to the Exporter - del self._children[span.context.span_id] + # Wait for children to complete asynchronously + self._executor.submit(self._wait_for_children, span=span) + elif span.parent is not None and _is_instrumentor_span(span): + # If this is one of the children spans waited upon, update its completion status + + # Updating the child span status + self._children[span.parent.span_id] = [ + child if child["span"].context.span_id != span.context.span_id else {"span": span, "complete": True} + for child in self._children[span.parent.span_id] + ] + + # Export the instrumentor span + self.span_exporter.export([span]) else: - if span.parent is not None and _is_instrumentor_span(span): - # Copy the Span and keep it until the Humanloop Span - # arrives in order to enrich it - self._children[span.parent.span_id].append(span) - # Pass the Span to the Exporter + # Unknown span, pass it to the Exporter + self.span_exporter.export([span]) + + def _wait_for_children(self, span: ReadableSpan): + """Wait for all children spans to complete before processing the Humanloop span.""" + span_id = span.context.span_id + while not all(child["complete"] for child in self._children[span_id]): + logger.debug( + "[HumanloopSpanProcessor] Span %s %s waiting for children to complete: %s", + span_id, + span.name, + self._children[span_id], + ) + time.sleep(0.1) + # All instrumentor spans have arrived, we can process the + # Humanloop parent span owning them if span.name == "humanloop.flow": write_to_opentelemetry_span( span=span, key=HUMANLOOP_FLOW_PREREQUISITES_KEY, - value=self._prerequisites[span.context.span_id], + value=self._prerequisites[span_id], ) + del self._prerequisites[span_id] + logger.debug("[HumanloopSpanProcessor] Dispatching span %s %s", span_id, span.name) + _process_span_dispatch(span, [child["span"] for child in self._children[span_id]]) + # Release references + del self._children[span_id] + # Pass Humanloop span to Exporter + logger.debug("[HumanloopSpanProcessor] Sending span %s %s to exporter", span_id, span.name) self.span_exporter.export([span]) def _is_instrumentor_span(span: ReadableSpan) -> bool: - """Determine if the Span contains information of interest for Spans created by Humanloop decorators.""" + """Determine if the span contains information of interest for Spans created by Humanloop decorators.""" # At the moment we only enrich Spans created by the Prompt decorators # As we add Instrumentors for other libraries, this function must # be expanded @@ -104,7 +161,11 @@ def _process_span_dispatch(span: ReadableSpan, children_spans: list[ReadableSpan elif file_type == "flow": pass else: - logger.error("Unknown Humanloop File Span %s", span) + logger.error( + "[HumanloopSpanProcessor] Unknown Humanloop File span %s %s", + span.context.span_id, + span.name, + ) def _process_prompt(prompt_span: ReadableSpan, children_spans: list[ReadableSpan]): @@ -150,9 +211,14 @@ def _enrich_prompt_kernel(prompt_span: ReadableSpan, llm_provider_call_span: Rea # Validate the Prompt Kernel PromptKernelRequest.model_validate(obj=prompt) except PydanticValidationError as e: - logger.error("Could not validate Prompt Kernel extracted from Span: %s", e) - - # Write the enriched Prompt Kernel back to the Span + logger.error( + "[HumanloopSpanProcessor] Could not validate Prompt Kernel extracted from span: %s %s. Error: %s", + prompt_span.context.span_id, + prompt_span.name, + e, + ) + + # Write the enriched Prompt Kernel back to the span hl_file["prompt"] = prompt write_to_opentelemetry_span( span=prompt_span, From 7d8d7de4243d2679e6733dda59e85bd3ab545b46 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 6 Jan 2025 16:26:24 +0000 Subject: [PATCH 04/11] QA nits from CI --- pyproject.toml | 3 +++ src/humanloop/otel/exporter.py | 21 +++++++++++++++------ tests/utilities/test_flow_decorator.py | 8 ++++++++ tests/utilities/test_prompt_decorator.py | 21 +++++++++++++++++++++ tests/utilities/test_tool_decorator.py | 4 ++++ 5 files changed, 51 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e559a681..2d113069 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ +[project] +name = "humanloop" + [tool.poetry] name = "humanloop" version = "0.8.22" diff --git a/src/humanloop/otel/exporter.py b/src/humanloop/otel/exporter.py index 60ba41bb..719d035c 100644 --- a/src/humanloop/otel/exporter.py +++ b/src/humanloop/otel/exporter.py @@ -206,7 +206,13 @@ def _complete_flow_log(self, span_id: int) -> None: if len(flow_children_span_ids) == 0: # All logs in the Trace have been uploaded, mark the Flow Log as complete flow_log_id = self._span_id_to_uploaded_log_id[flow_log_span_id] - self._client.flows.update_log(log_id=flow_log_id, trace_status="complete") + if flow_log_id is None: + logger.error( + "[HumanloopSpanExporter] Cannot complete Flow log %s, log ID is None", + flow_log_span_id, + ) + else: + self._client.flows.update_log(log_id=flow_log_id, trace_status="complete") break def _export_span_dispatch(self, span: ReadableSpan) -> None: @@ -324,11 +330,14 @@ def _export_flow(self, span: ReadableSpan) -> None: key=HUMANLOOP_LOG_KEY, ) # Spans that must be uploaded before the Flow Span is completed - prerequisites = read_from_opentelemetry_span( - span=span, - key=HUMANLOOP_FLOW_PREREQUISITES_KEY, - ) - self._flow_log_prerequisites[span.context.span_id] = set(prerequisites) + try: + prerequisites: list[int] = read_from_opentelemetry_span( # type: ignore + span=span, + key=HUMANLOOP_FLOW_PREREQUISITES_KEY, + ) + self._flow_log_prerequisites[span.context.span_id] = set(prerequisites) + except KeyError: + self._flow_log_prerequisites[span.context.span_id] = set() path: str = file_object["path"] flow: FlowKernelRequestParams diff --git a/tests/utilities/test_flow_decorator.py b/tests/utilities/test_flow_decorator.py index 95e708a0..b84c2208 100644 --- a/tests/utilities/test_flow_decorator.py +++ b/tests/utilities/test_flow_decorator.py @@ -84,11 +84,15 @@ def test_decorators_without_flow( ] ) # WHEN exporting the spans + # Wait for the prompt span to be exported; It was waiting + # on the OpenAI call span to finish first + time.sleep(1) spans = exporter.get_finished_spans() # THEN 3 spans arrive at the exporter in the following order: # 0. Intercepted OpenAI call, which is ignored by the exporter # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) # 2. Prompt Span + print("WOW", [span.name for span in spans]) assert len(spans) == 3 assert read_from_opentelemetry_span( span=spans[1], @@ -146,6 +150,10 @@ def test_flow_decorator_flow_in_flow( # WHEN Calling the _test_flow_in_flow function with specific messages _flow_over_flow(call_llm_messages) + # Wait for the Prompt span to be exported; It was asynchronously waiting + # on the OpenAI call span to finish first + time.sleep(1) + # THEN 5 spans are arrive at the exporter in the following order: # 0. Intercepted OpenAI call, which is ignored by the exporter # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) diff --git a/tests/utilities/test_prompt_decorator.py b/tests/utilities/test_prompt_decorator.py index 84705a67..96bffeda 100644 --- a/tests/utilities/test_prompt_decorator.py +++ b/tests/utilities/test_prompt_decorator.py @@ -1,4 +1,5 @@ import os +import time from typing import Optional import cohere @@ -158,6 +159,11 @@ def test_prompt_decorator( model=model, messages=call_llm_messages, ) + + # Wait for the Prompt span to be exported, it is waiting + # asynchronously for the LLM provider call span to finish + time.sleep(1) + # THEN two spans are created: one for the OpenAI LLM provider call and one for the Prompt spans = exporter.get_finished_spans() assert len(spans) == 2 @@ -189,7 +195,13 @@ def test_prompt_decorator_with_hl_processor( model=model, messages=call_llm_messages, ) + # THEN two spans are created: one for the OpenAI LLM provider call and one for the Prompt + + # Wait for the Prompt span to be exported, it is waiting + # asynchronously for the LLM provider call span to finish + time.sleep(1) + spans = exporter.get_finished_spans() assert len(spans) == 2 assert not is_humanloop_span(span=spans[0]) @@ -237,6 +249,11 @@ def test_prompt_decorator_with_defaults( model=model, messages=call_llm_messages, ) + + # Wait for the Prompt span to be exported, it is waiting + # asynchronously for the LLM provider call span to finish + time.sleep(1) + spans = exporter.get_finished_spans() # THEN the Prompt span is enhanced with information and forms a correct PromptKernel prompt = PromptKernelRequest.model_validate( @@ -289,6 +306,10 @@ def test_prompt_attributes( messages=call_llm_messages, ) + # Wait for the Prompt span to be exported, it is waiting + # asynchronously for the LLM provider call span to finish + time.sleep(1) + assert len(exporter.get_finished_spans()) == 2 prompt_kernel = PromptKernelRequest.model_validate( diff --git a/tests/utilities/test_tool_decorator.py b/tests/utilities/test_tool_decorator.py index 29cc8fe1..983c93f6 100644 --- a/tests/utilities/test_tool_decorator.py +++ b/tests/utilities/test_tool_decorator.py @@ -1,4 +1,5 @@ import sys +import time from typing import Any, Optional, TypedDict, Union import pytest @@ -450,6 +451,9 @@ def calculator(operation: str, num1: float, num2: float) -> float: higher_order_fn_tool(operation="add", num1=1, num2=2) calculator(operation="add", num1=1, num2=2) + # Processor handles HL spans asynchronously, wait for them + time.sleep(1) + assert len(spans := exporter.get_finished_spans()) == 2 hl_file_higher_order_fn = read_from_opentelemetry_span( From 3a9e56d168e5c4cebc654f4e376c2b956ca6ef57 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 6 Jan 2025 16:43:43 +0000 Subject: [PATCH 05/11] Ordering of spans is no longer guaranteed fix --- tests/utilities/test_flow_decorator.py | 35 +++++++++++++++----------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/tests/utilities/test_flow_decorator.py b/tests/utilities/test_flow_decorator.py index b84c2208..58fe7364 100644 --- a/tests/utilities/test_flow_decorator.py +++ b/tests/utilities/test_flow_decorator.py @@ -2,19 +2,21 @@ import random import string import time -from unittest.mock import patch +from unittest.mock import patch import pytest +from openai import OpenAI +from openai.types.chat.chat_completion_message_param import ChatCompletionMessageParam +from opentelemetry.sdk.trace import Tracer +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace import ReadableSpan + from humanloop.utilities.flow import flow from humanloop.utilities.prompt import prompt from humanloop.utilities.tool import tool from humanloop.otel.constants import HUMANLOOP_FILE_KEY from humanloop.otel.exporter import HumanloopSpanExporter from humanloop.otel.helpers import read_from_opentelemetry_span -from openai import OpenAI -from openai.types.chat.chat_completion_message_param import ChatCompletionMessageParam -from opentelemetry.sdk.trace import Tracer -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter def _test_scenario( @@ -255,14 +257,17 @@ def test_flow_decorator_hl_exporter_flow_inside_flow( time.sleep(3) # THEN 5 spans are arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span - # 3. Nested Flow Span - # 4. Flow Span assert len(mock_export_method.call_args_list) == 5 - # THEN the last uploaded span is the larger Flow - # THEN the second to last uploaded span is the nested Flow - flow_span = mock_export_method.call_args_list[4][0][0][0] - nested_flow_span = mock_export_method.call_args_list[3][0][0][0] - assert nested_flow_span.parent.span_id == flow_span.context.span_id + + # THEN one of the flows is nested inside the other + spans: list[ReadableSpan] = [mock_export_method.call_args_list[i][0][0][0] for i in range(1, 5)] + counter = 0 + for span in spans: + if span.name == "humanloop.flow": + counter += 1 + if span.parent: + nested_flow_span = span + else: + flow_span = span + # We are certain span_id exists for these 2 spans + assert nested_flow_span.parent.span_id == flow_span.context.span_id # type: ignore From 432e41395156b6a00143377d3172976eb014d450 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 6 Jan 2025 16:53:02 +0000 Subject: [PATCH 06/11] removed print statement --- tests/utilities/test_flow_decorator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/utilities/test_flow_decorator.py b/tests/utilities/test_flow_decorator.py index 58fe7364..ca2fcfcb 100644 --- a/tests/utilities/test_flow_decorator.py +++ b/tests/utilities/test_flow_decorator.py @@ -94,7 +94,6 @@ def test_decorators_without_flow( # 0. Intercepted OpenAI call, which is ignored by the exporter # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) # 2. Prompt Span - print("WOW", [span.name for span in spans]) assert len(spans) == 3 assert read_from_opentelemetry_span( span=spans[1], From 11ff653cbd81717d9c9cafc996045b055c27b509 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Tue, 7 Jan 2025 11:31:01 +0000 Subject: [PATCH 07/11] Removed dependency on order of spans arrival in flow tests --- tests/utilities/test_flow_decorator.py | 91 +++++++++++++++----------- 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/tests/utilities/test_flow_decorator.py b/tests/utilities/test_flow_decorator.py index ca2fcfcb..d51423cd 100644 --- a/tests/utilities/test_flow_decorator.py +++ b/tests/utilities/test_flow_decorator.py @@ -90,17 +90,22 @@ def test_decorators_without_flow( # on the OpenAI call span to finish first time.sleep(1) spans = exporter.get_finished_spans() - # THEN 3 spans arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span + + # THEN 3 spans arrive at the exporter: assert len(spans) == 3 + + for i in range(3): + if spans[i].name == "humanloop.tool": + tool_span = spans[i] + elif spans[i].name == "humanloop.prompt": + prompt_span = spans[i] + assert read_from_opentelemetry_span( - span=spans[1], + span=tool_span, key=HUMANLOOP_FILE_KEY, )["tool"] assert read_from_opentelemetry_span( - span=spans[2], + span=prompt_span, key=HUMANLOOP_FILE_KEY, )["prompt"] @@ -126,17 +131,23 @@ def test_decorators_with_flow_decorator( }, ] ) - # THEN 4 spans arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span - # 3. Flow Span - spans = exporter.get_finished_spans() + + # THEN 4 spans arrive at the exporter: + spans: list[ReadableSpan] = exporter.get_finished_spans() assert len(spans) == 4 + + for i in range(4): + if spans[i].name == "humanloop.flow": + flow_span = spans[i] + elif spans[i].name == "humanloop.prompt": + prompt_span = spans[i] + elif spans[i].name == "humanloop.tool": + tool_span = spans[i] + # THEN the span are returned bottom to top - assert read_from_opentelemetry_span(span=spans[1], key=HUMANLOOP_FILE_KEY)["tool"] - assert read_from_opentelemetry_span(span=spans[2], key=HUMANLOOP_FILE_KEY)["prompt"] - assert read_from_opentelemetry_span(span=spans[3], key=HUMANLOOP_FILE_KEY)["flow"] + assert read_from_opentelemetry_span(span=tool_span, key=HUMANLOOP_FILE_KEY)["tool"] + assert read_from_opentelemetry_span(span=prompt_span, key=HUMANLOOP_FILE_KEY)["prompt"] + assert read_from_opentelemetry_span(span=flow_span, key=HUMANLOOP_FILE_KEY)["flow"] def test_flow_decorator_flow_in_flow( @@ -155,19 +166,25 @@ def test_flow_decorator_flow_in_flow( # on the OpenAI call span to finish first time.sleep(1) - # THEN 5 spans are arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span - # 3. Nested Flow Span - # 4. Flow Span - spans = exporter.get_finished_spans() + # THEN 5 spans arrive at the exporter + spans: list[ReadableSpan] = exporter.get_finished_spans() assert len(spans) == 5 - assert read_from_opentelemetry_span(span=spans[1], key=HUMANLOOP_FILE_KEY)["tool"] - assert read_from_opentelemetry_span(span=spans[2], key=HUMANLOOP_FILE_KEY)["prompt"] - assert read_from_opentelemetry_span(span=spans[3], key=HUMANLOOP_FILE_KEY)["flow"] != {} + + for i in range(5): + if spans[i].name == "humanloop.flow" and spans[i].parent is None: + flow_span = spans[i] + elif spans[i].name == "humanloop.flow" and spans[i].parent: + nested_flow_span = spans[i] + elif spans[i].name == "humanloop.prompt": + prompt_span = spans[i] + elif spans[i].name == "humanloop.tool": + tool_span = spans[i] + + assert read_from_opentelemetry_span(span=tool_span, key=HUMANLOOP_FILE_KEY)["tool"] + assert read_from_opentelemetry_span(span=prompt_span, key=HUMANLOOP_FILE_KEY)["prompt"] + assert read_from_opentelemetry_span(span=nested_flow_span, key=HUMANLOOP_FILE_KEY)["flow"] != {} with pytest.raises(KeyError): - read_from_opentelemetry_span(span=spans[4], key=HUMANLOOP_FILE_KEY)["flow"] != {} + read_from_opentelemetry_span(span=flow_span, key=HUMANLOOP_FILE_KEY)["flow"] != {} def test_flow_decorator_with_hl_exporter( @@ -187,17 +204,17 @@ def test_flow_decorator_with_hl_exporter( # Exporter is threaded, need to wait threads shutdown time.sleep(3) - # THEN 4 spans are arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span - # 3. Flow Span assert len(mock_export_method.call_args_list) == 4 - tool_span = mock_export_method.call_args_list[1][0][0][0] - prompt_span = mock_export_method.call_args_list[2][0][0][0] - flow_span = mock_export_method.call_args_list[3][0][0][0] - # THEN the last uploaded span is the Flow + for i in range(4): + span = mock_export_method.call_args_list[i][0][0][0] + if span.name == "humanloop.flow": + flow_span = span + elif span.name == "humanloop.prompt": + prompt_span = span + elif span.name == "humanloop.tool": + tool_span = span + assert read_from_opentelemetry_span( span=flow_span, key=HUMANLOOP_FILE_KEY, @@ -216,8 +233,6 @@ def test_flow_decorator_with_hl_exporter( key=HUMANLOOP_FILE_KEY, ) - # NOTE: The type: ignore comments are caused by the MagicMock used to mock the HTTP client - # THEN the first Log uploaded is the Flow first_log = exporter._client.flows.log.call_args_list[0][1] # type: ignore assert "flow" in first_log @@ -255,7 +270,7 @@ def test_flow_decorator_hl_exporter_flow_inside_flow( # Exporter is threaded, need to wait threads shutdown time.sleep(3) - # THEN 5 spans are arrive at the exporter in the following order: + # THEN 5 spans are arrive at the exporter assert len(mock_export_method.call_args_list) == 5 # THEN one of the flows is nested inside the other From 637916af14479564dd11bc86d69f738e67b70130 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Tue, 7 Jan 2025 11:57:10 +0000 Subject: [PATCH 08/11] fixed typing errors --- src/humanloop/otel/helpers.py | 3 --- tests/utilities/test_flow_decorator.py | 4 ++-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/humanloop/otel/helpers.py b/src/humanloop/otel/helpers.py index c620d577..9e645144 100644 --- a/src/humanloop/otel/helpers.py +++ b/src/humanloop/otel/helpers.py @@ -1,13 +1,10 @@ import json -import uuid from typing import Any, Callable, Union from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.trace import SpanKind from opentelemetry.util.types import AttributeValue -from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_LOG_KEY - NestedDict = dict[str, Union["NestedDict", AttributeValue]] NestedList = list[Union["NestedList", NestedDict]] diff --git a/tests/utilities/test_flow_decorator.py b/tests/utilities/test_flow_decorator.py index d51423cd..41e9fc6d 100644 --- a/tests/utilities/test_flow_decorator.py +++ b/tests/utilities/test_flow_decorator.py @@ -133,7 +133,7 @@ def test_decorators_with_flow_decorator( ) # THEN 4 spans arrive at the exporter: - spans: list[ReadableSpan] = exporter.get_finished_spans() + spans: tuple[ReadableSpan] = exporter.get_finished_spans() assert len(spans) == 4 for i in range(4): @@ -167,7 +167,7 @@ def test_flow_decorator_flow_in_flow( time.sleep(1) # THEN 5 spans arrive at the exporter - spans: list[ReadableSpan] = exporter.get_finished_spans() + spans: tuple[ReadableSpan] = exporter.get_finished_spans() assert len(spans) == 5 for i in range(5): From 0c4b2876fd11f6c7c01551511240fd65d2244491 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Tue, 7 Jan 2025 11:59:41 +0000 Subject: [PATCH 09/11] Fixed type complaints for real --- tests/utilities/test_flow_decorator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/utilities/test_flow_decorator.py b/tests/utilities/test_flow_decorator.py index 41e9fc6d..da895ee0 100644 --- a/tests/utilities/test_flow_decorator.py +++ b/tests/utilities/test_flow_decorator.py @@ -91,7 +91,7 @@ def test_decorators_without_flow( time.sleep(1) spans = exporter.get_finished_spans() - # THEN 3 spans arrive at the exporter: + # THEN 3 spans arrive at the exporter assert len(spans) == 3 for i in range(3): @@ -132,8 +132,8 @@ def test_decorators_with_flow_decorator( ] ) - # THEN 4 spans arrive at the exporter: - spans: tuple[ReadableSpan] = exporter.get_finished_spans() + # THEN 4 spans arrive at the exporter + spans = exporter.get_finished_spans() assert len(spans) == 4 for i in range(4): @@ -167,7 +167,7 @@ def test_flow_decorator_flow_in_flow( time.sleep(1) # THEN 5 spans arrive at the exporter - spans: tuple[ReadableSpan] = exporter.get_finished_spans() + spans = exporter.get_finished_spans() assert len(spans) == 5 for i in range(5): From 236b52c306db6ddb88873c93c551c602c2da9755 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Tue, 7 Jan 2025 18:46:27 +0000 Subject: [PATCH 10/11] renamed span utility function --- src/humanloop/otel/exporter.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/humanloop/otel/exporter.py b/src/humanloop/otel/exporter.py index 719d035c..8f4334f0 100644 --- a/src/humanloop/otel/exporter.py +++ b/src/humanloop/otel/exporter.py @@ -199,7 +199,7 @@ def _do_work(self): self._upload_queue.put((span_to_export, evaluation_context)) self._upload_queue.task_done() - def _complete_flow_log(self, span_id: int) -> None: + def _mark_span_completed(self, span_id: int) -> None: for flow_log_span_id, flow_children_span_ids in self._flow_log_prerequisites.items(): if span_id in flow_children_span_ids: flow_children_span_ids.remove(span_id) @@ -282,7 +282,7 @@ def _export_prompt(self, span: ReadableSpan) -> None: self._span_id_to_uploaded_log_id[span.context.span_id] = log_response.id except HumanloopApiError: self._span_id_to_uploaded_log_id[span.context.span_id] = None - self._complete_flow_log(span_id=span.context.span_id) + self._mark_span_completed(span_id=span.context.span_id) def _export_tool(self, span: ReadableSpan) -> None: file_object: dict[str, Any] = read_from_opentelemetry_span( @@ -318,7 +318,7 @@ def _export_tool(self, span: ReadableSpan) -> None: self._span_id_to_uploaded_log_id[span.context.span_id] = log_response.id except HumanloopApiError: self._span_id_to_uploaded_log_id[span.context.span_id] = None - self._complete_flow_log(span_id=span.context.span_id) + self._mark_span_completed(span_id=span.context.span_id) def _export_flow(self, span: ReadableSpan) -> None: file_object: dict[str, Any] = read_from_opentelemetry_span( @@ -362,4 +362,4 @@ def _export_flow(self, span: ReadableSpan) -> None: except HumanloopApiError as e: logger.error(str(e)) self._span_id_to_uploaded_log_id[span.context.span_id] = None - self._complete_flow_log(span_id=span.context.span_id) + self._mark_span_completed(span_id=span.context.span_id) From a9f4bc8fba386d3509afb705c2990e3673fba499 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Thu, 9 Jan 2025 06:58:56 +0000 Subject: [PATCH 11/11] Removed needless time.sleep --- src/humanloop/otel/exporter.py | 4 ++-- src/humanloop/otel/processor.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/humanloop/otel/exporter.py b/src/humanloop/otel/exporter.py index 8f4334f0..544d2e7b 100644 --- a/src/humanloop/otel/exporter.py +++ b/src/humanloop/otel/exporter.py @@ -222,12 +222,12 @@ def _export_span_dispatch(self, span: ReadableSpan) -> None: while parent_span_id and self._span_id_to_uploaded_log_id.get(parent_span_id) is None: logger.debug( - "[HumanloopSpanExporter] Span %s %s waiting for parent %s to be uploaded", + "[HumanloopSpanExporter] _export_span_dispatch on Thread %s Span %s %s waiting for parent %s to be uploaded", + threading.get_ident(), span.context.span_id, span.name, parent_span_id, ) - time.sleep(0.1) logger.debug( "[HumanloopSpanExporter] Exporting span %s with file type %s", diff --git a/src/humanloop/otel/processor.py b/src/humanloop/otel/processor.py index f775768e..d027cb35 100644 --- a/src/humanloop/otel/processor.py +++ b/src/humanloop/otel/processor.py @@ -115,7 +115,6 @@ def _wait_for_children(self, span: ReadableSpan): span.name, self._children[span_id], ) - time.sleep(0.1) # All instrumentor spans have arrived, we can process the # Humanloop parent span owning them if span.name == "humanloop.flow":