Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 37 additions & 49 deletions src/humanloop/eval_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing_extensions import NotRequired, TypedDict
import time
import sys
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

from .client import BaseHumanloop
Expand All @@ -34,7 +35,7 @@
from .requests import HumanEvaluatorRequestParams as HumanEvaluatorDict


# Responses are Pydantic models and we leverage them for improved request validation
# Responses are Pydantic models, we leverage them for improved request validation
from .types import FlowKernelRequest as Flow
from .types import PromptKernelRequest as Prompt
from .types import ToolKernelRequest as Tool
Expand Down Expand Up @@ -97,22 +98,12 @@ class File(Identifiers):
"""The function being evaluated.
It will be called using your Dataset `inputs` as follows: `output = callable(**datapoint.inputs)`.
If `messages` are defined in your Dataset, then `output = callable(**datapoint.inputs, messages=datapoint.messages)`.
It should return a single string output. If not, you must provide a `custom_logger`.
"""
custom_logger: NotRequired[Callable]
"""function that logs the output of your function to Humanloop, replacing the default logging.
If provided, it will be called as follows:
```
output = callable(**datapoint.inputs).
log = custom_logger(client, output)
```
Inside the custom_logger, you can use the Humanloop `client` to log the output of your function.
If not provided your pipline must return a single string.
It should return a string or json serializable output.
"""


class Dataset(Identifiers):
datapoints: Sequence[DatapointDict]
datapoints: NotRequired[Sequence[DatapointDict]]
"""The datapoints to map your function over to produce the outputs required by the evaluation."""
action: NotRequired[UpdateDatasetAction]
"""How to update the Dataset given the provided Datapoints;
Expand All @@ -128,15 +119,6 @@ class Evaluator(Identifiers):
"""The type of return value the Evaluator produces - only required for local Evaluators."""
callable: NotRequired[Callable]
"""The function to run on the logs to produce the judgment - only required for local Evaluators."""
custom_logger: NotRequired[Callable]
"""optional function that logs the output judgment from your Evaluator to Humanloop, if provided, it will be called as follows:
```
judgment = callable(log_dict)
log = custom_logger(client, judgmemt)
```
Inside the custom_logger, you can use the Humanloop `client` to log the judgment to Humanloop.
If not provided your function must return a single string and by default the code will be used to inform the version of the external Evaluator on Humanloop.
"""
threshold: NotRequired[float]
"""The threshold to check the Evaluator against. If the aggregate value of the Evaluator is below this threshold, the check will fail."""

Expand Down Expand Up @@ -213,7 +195,6 @@ def _run_eval(
f"No `callable` provided for your {type_} file - will attempt to generate logs on Humanloop."
)

custom_logger = file.pop("custom_logger", None)
file_dict = {**file, **version}

if type_ == "flow":
Expand Down Expand Up @@ -252,8 +233,17 @@ def _run_eval(
raise NotImplementedError(f"Unsupported File type: {type_}")

# Upsert the Dataset
hl_dataset = client.datasets.upsert(**dataset)
hl_dataset = client.datasets.get(id=hl_dataset.id, include_datapoints=True)
action = dataset.get("action", "set") # set is the server default - None not allowed.
if "datapoints" not in dataset:
dataset["datapoints"] = []
# Use `upsert` to get existing dataset ID if no datapoints provided, given we can't `get` on path.
action = "add"
hl_dataset = client.datasets.upsert(**dataset, action=action)
hl_dataset = client.datasets.get(
id=hl_dataset.id,
version_id=hl_dataset.version_id,
include_datapoints=True
)

# Upsert the local Evaluators; other Evaluators are just referenced by `path` or `id`
local_evaluators: List[Evaluator] = []
Expand Down Expand Up @@ -341,20 +331,22 @@ def process_datapoint(datapoint: Datapoint):
)
else:
output = function_(**datapoint_dict["inputs"])
if custom_logger:
log = function_(client=client, output=output)
else:
if not isinstance(output, str):

if not isinstance(output, str):
try:
output = json.dumps(output)
# throw error if it fails to serialize
except Exception as _:
raise ValueError(
f"Your {type_}'s `callable` must return a string if you do not provide a custom logger."
f"Your {type_}'s `callable` must return a string or a JSON serializable object."
)
log = log_func(
inputs=datapoint.inputs,
output=output,
source_datapoint_id=datapoint.id,
start_time=start_time,
end_time=datetime.now(),
)
log = log_func(
inputs=datapoint.inputs,
output=output,
source_datapoint_id=datapoint.id,
start_time=start_time,
end_time=datetime.now(),
)
except Exception as e:
log = log_func(
inputs=datapoint.inputs,
Expand All @@ -377,18 +369,14 @@ def process_datapoint(datapoint: Datapoint):
else:
judgment = eval_function(log.dict())

if local_evaluator.get("custom_logger", None):
local_evaluator["custom_logger"](client=client, judgment=judgment)
else:
# The API call will validate the judgment
_ = client.evaluators.log(
parent_id=log.id,
id=local_evaluator.get("id"),
path=local_evaluator.get("path"),
judgment=judgment,
start_time=start_time,
end_time=datetime.now(),
)
_ = client.evaluators.log(
parent_id=log.id,
id=local_evaluator.get("id"),
path=local_evaluator.get("path"),
judgment=judgment,
start_time=start_time,
end_time=datetime.now(),
)
except Exception as e:
_ = client.evaluators.log(
parent_id=log.id,
Expand Down
Loading