diff --git a/src/humanloop/eval_utils.py b/src/humanloop/eval_utils.py index 493b57a9..b13d6c3b 100644 --- a/src/humanloop/eval_utils.py +++ b/src/humanloop/eval_utils.py @@ -18,6 +18,7 @@ from typing_extensions import NotRequired, TypedDict import time import sys +import json from concurrent.futures import ThreadPoolExecutor, as_completed from .client import BaseHumanloop @@ -34,7 +35,7 @@ from .requests import HumanEvaluatorRequestParams as HumanEvaluatorDict -# Responses are Pydantic models and we leverage them for improved request validation +# Responses are Pydantic models, we leverage them for improved request validation from .types import FlowKernelRequest as Flow from .types import PromptKernelRequest as Prompt from .types import ToolKernelRequest as Tool @@ -97,22 +98,12 @@ class File(Identifiers): """The function being evaluated. It will be called using your Dataset `inputs` as follows: `output = callable(**datapoint.inputs)`. If `messages` are defined in your Dataset, then `output = callable(**datapoint.inputs, messages=datapoint.messages)`. - It should return a single string output. If not, you must provide a `custom_logger`. - """ - custom_logger: NotRequired[Callable] - """function that logs the output of your function to Humanloop, replacing the default logging. - If provided, it will be called as follows: - ``` - output = callable(**datapoint.inputs). - log = custom_logger(client, output) - ``` - Inside the custom_logger, you can use the Humanloop `client` to log the output of your function. - If not provided your pipline must return a single string. + It should return a string or json serializable output. """ class Dataset(Identifiers): - datapoints: Sequence[DatapointDict] + datapoints: NotRequired[Sequence[DatapointDict]] """The datapoints to map your function over to produce the outputs required by the evaluation.""" action: NotRequired[UpdateDatasetAction] """How to update the Dataset given the provided Datapoints; @@ -128,15 +119,6 @@ class Evaluator(Identifiers): """The type of return value the Evaluator produces - only required for local Evaluators.""" callable: NotRequired[Callable] """The function to run on the logs to produce the judgment - only required for local Evaluators.""" - custom_logger: NotRequired[Callable] - """optional function that logs the output judgment from your Evaluator to Humanloop, if provided, it will be called as follows: - ``` - judgment = callable(log_dict) - log = custom_logger(client, judgmemt) - ``` - Inside the custom_logger, you can use the Humanloop `client` to log the judgment to Humanloop. - If not provided your function must return a single string and by default the code will be used to inform the version of the external Evaluator on Humanloop. - """ threshold: NotRequired[float] """The threshold to check the Evaluator against. If the aggregate value of the Evaluator is below this threshold, the check will fail.""" @@ -213,7 +195,6 @@ def _run_eval( f"No `callable` provided for your {type_} file - will attempt to generate logs on Humanloop." ) - custom_logger = file.pop("custom_logger", None) file_dict = {**file, **version} if type_ == "flow": @@ -252,8 +233,17 @@ def _run_eval( raise NotImplementedError(f"Unsupported File type: {type_}") # Upsert the Dataset - hl_dataset = client.datasets.upsert(**dataset) - hl_dataset = client.datasets.get(id=hl_dataset.id, include_datapoints=True) + action = dataset.get("action", "set") # set is the server default - None not allowed. + if "datapoints" not in dataset: + dataset["datapoints"] = [] + # Use `upsert` to get existing dataset ID if no datapoints provided, given we can't `get` on path. + action = "add" + hl_dataset = client.datasets.upsert(**dataset, action=action) + hl_dataset = client.datasets.get( + id=hl_dataset.id, + version_id=hl_dataset.version_id, + include_datapoints=True + ) # Upsert the local Evaluators; other Evaluators are just referenced by `path` or `id` local_evaluators: List[Evaluator] = [] @@ -341,20 +331,22 @@ def process_datapoint(datapoint: Datapoint): ) else: output = function_(**datapoint_dict["inputs"]) - if custom_logger: - log = function_(client=client, output=output) - else: - if not isinstance(output, str): + + if not isinstance(output, str): + try: + output = json.dumps(output) + # throw error if it fails to serialize + except Exception as _: raise ValueError( - f"Your {type_}'s `callable` must return a string if you do not provide a custom logger." + f"Your {type_}'s `callable` must return a string or a JSON serializable object." ) - log = log_func( - inputs=datapoint.inputs, - output=output, - source_datapoint_id=datapoint.id, - start_time=start_time, - end_time=datetime.now(), - ) + log = log_func( + inputs=datapoint.inputs, + output=output, + source_datapoint_id=datapoint.id, + start_time=start_time, + end_time=datetime.now(), + ) except Exception as e: log = log_func( inputs=datapoint.inputs, @@ -377,18 +369,14 @@ def process_datapoint(datapoint: Datapoint): else: judgment = eval_function(log.dict()) - if local_evaluator.get("custom_logger", None): - local_evaluator["custom_logger"](client=client, judgment=judgment) - else: - # The API call will validate the judgment - _ = client.evaluators.log( - parent_id=log.id, - id=local_evaluator.get("id"), - path=local_evaluator.get("path"), - judgment=judgment, - start_time=start_time, - end_time=datetime.now(), - ) + _ = client.evaluators.log( + parent_id=log.id, + id=local_evaluator.get("id"), + path=local_evaluator.get("path"), + judgment=judgment, + start_time=start_time, + end_time=datetime.now(), + ) except Exception as e: _ = client.evaluators.log( parent_id=log.id,