Coverage for langsmith/evaluation/evaluator.py: 44%
376 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""This module contains the evaluator classes for evaluating runs."""
3from __future__ import annotations
5import asyncio
6import inspect
7import uuid
8from abc import abstractmethod
9from collections.abc import Awaitable, Sequence
10from typing import (
11 Any,
12 Callable,
13 Literal,
14 Optional,
15 Union,
16 cast,
17)
19from typing_extensions import TypedDict
21from langsmith import run_helpers as rh
22from langsmith import schemas
24try:
25 from pydantic.v1 import ( # type: ignore[import]
26 BaseModel,
27 Field,
28 ValidationError,
29 validator,
30 )
31except ImportError:
32 from pydantic import ( # type: ignore[assignment]
33 BaseModel,
34 Field,
35 ValidationError,
36 validator,
37 )
39import logging
40from functools import wraps
42from langsmith.schemas import SCORE_TYPE, VALUE_TYPE, Example, Run
44logger = logging.getLogger(__name__)
47class Category(TypedDict):
48 """A category for categorical feedback."""
50 value: Optional[Union[float, int]]
51 """The numeric score/ordinal corresponding to this category."""
52 label: str
53 """The label for this category."""
56class FeedbackConfig(TypedDict, total=False):
57 """Configuration to define a type of feedback.
59 Applied on on the first creation of a `feedback_key`.
60 """
62 type: Literal["continuous", "categorical", "freeform"]
63 """The type of feedback."""
64 min: Optional[Union[float, int]]
65 """The minimum permitted value (if continuous type)."""
66 max: Optional[Union[float, int]]
67 """The maximum value permitted value (if continuous type)."""
68 categories: Optional[list[Union[Category, dict]]]
71class EvaluationResult(BaseModel):
72 """Evaluation result."""
74 key: str
75 """The aspect, metric name, or label for this evaluation."""
76 score: SCORE_TYPE = None
77 """The numeric score for this evaluation."""
78 value: VALUE_TYPE = None
79 """The value for this evaluation, if not numeric."""
80 comment: Optional[str] = None
81 """An explanation regarding the evaluation."""
82 correction: Optional[dict] = None
83 """What the correct value should be, if applicable."""
84 evaluator_info: dict = Field(default_factory=dict)
85 """Additional information about the evaluator."""
86 feedback_config: Optional[Union[FeedbackConfig, dict]] = None
87 """The configuration used to generate this feedback."""
88 source_run_id: Optional[Union[uuid.UUID, str]] = None
89 """The ID of the trace of the evaluator itself."""
90 target_run_id: Optional[Union[uuid.UUID, str]] = None
91 """The ID of the trace this evaluation is applied to.
93 If none provided, the evaluation feedback is applied to the
94 root trace being."""
95 extra: Optional[dict] = None
96 """Metadata for the evaluator run."""
98 class Config:
99 """Pydantic model configuration."""
101 allow_extra = False
103 @validator("value", pre=True)
104 def check_value_non_numeric(cls, v, values):
105 """Check that the value is not numeric."""
106 # If a score isn't provided and the value is numeric
107 # it's more likely the user intended use the score field
108 if "score" not in values or values["score"] is None:
109 if isinstance(v, (int, float)):
110 logger.warning(
111 "Numeric values should be provided in"
112 " the 'score' field, not 'value'."
113 f" Got: {v}"
114 )
115 return v
118class EvaluationResults(TypedDict, total=False):
119 """Batch evaluation results.
121 This makes it easy for your evaluator to return multiple
122 metrics at once.
123 """
125 results: list[EvaluationResult]
126 """The evaluation results."""
129class RunEvaluator:
130 """Evaluator interface class."""
132 @abstractmethod
133 def evaluate_run(
134 self,
135 run: Run,
136 example: Optional[Example] = None,
137 evaluator_run_id: Optional[uuid.UUID] = None,
138 ) -> Union[EvaluationResult, EvaluationResults]:
139 """Evaluate an example."""
141 async def aevaluate_run(
142 self,
143 run: Run,
144 example: Optional[Example] = None,
145 evaluator_run_id: Optional[uuid.UUID] = None,
146 ) -> Union[EvaluationResult, EvaluationResults]:
147 """Evaluate an example asynchronously."""
148 current_context = rh.get_tracing_context()
150 def _run_with_context():
151 with rh.tracing_context(**current_context):
152 return self.evaluate_run(run, example, evaluator_run_id)
154 return await asyncio.get_running_loop().run_in_executor(None, _run_with_context)
157_RUNNABLE_OUTPUT = Union[EvaluationResult, EvaluationResults, dict]
160class ComparisonEvaluationResult(BaseModel):
161 """Feedback scores for the results of comparative evaluations.
163 These are generated by functions that compare two or more runs,
164 returning a ranking or other feedback.
165 """
167 key: str
168 """The aspect, metric name, or label for this evaluation."""
169 scores: dict[Union[uuid.UUID, str], SCORE_TYPE]
170 """The scores for each run in the comparison."""
171 source_run_id: Optional[Union[uuid.UUID, str]] = None
172 """The ID of the trace of the evaluator itself."""
173 comment: Optional[Union[str, dict[Union[uuid.UUID, str], str]]] = None
174 """Comment for the scores. If a string, it's shared across all target runs.
176 If a `dict`, it maps run IDs to individual comments.
177 """
180_COMPARISON_OUTPUT = Union[ComparisonEvaluationResult, dict]
183class DynamicRunEvaluator(RunEvaluator):
184 """A dynamic evaluator that wraps a function and transforms it into a `RunEvaluator`.
186 This class is designed to be used with the `@run_evaluator` decorator, allowing
187 functions that take a `Run` and an optional `Example` as arguments, and return
188 an `EvaluationResult` or `EvaluationResults`, to be used as instances of `RunEvaluator`.
190 Attributes:
191 func (Callable): The function that is wrapped by this evaluator.
192 """ # noqa: E501
194 def __init__(
195 self,
196 func: Callable[
197 [Run, Optional[Example]],
198 Union[_RUNNABLE_OUTPUT, Awaitable[_RUNNABLE_OUTPUT]],
199 ],
200 # Async function to be used for async evaluation. Optional
201 afunc: Optional[
202 Callable[
203 [Run, Optional[Example]],
204 Awaitable[_RUNNABLE_OUTPUT],
205 ]
206 ] = None,
207 ):
208 """Initialize the `DynamicRunEvaluator` with a given function.
210 Args:
211 func (Callable): A function that takes a `Run` and an optional `Example` as
212 arguments, and returns a dict or `ComparisonEvaluationResult`.
213 """
214 (func, prepare_inputs) = _normalize_evaluator_func(func)
215 if afunc:
216 (afunc, prepare_inputs) = _normalize_evaluator_func(afunc) # type: ignore[assignment]
218 def process_inputs(inputs: dict) -> dict:
219 if prepare_inputs is None:
220 return inputs
221 (_, _, traced_inputs) = prepare_inputs(
222 inputs.get("run"), inputs.get("example")
223 )
224 return traced_inputs
226 wraps(func)(self)
227 from langsmith import run_helpers # type: ignore
229 if afunc is not None:
230 self.afunc = run_helpers.ensure_traceable(
231 afunc, process_inputs=process_inputs
232 )
233 self._name = getattr(afunc, "__name__", "DynamicRunEvaluator")
234 if inspect.iscoroutinefunction(func):
235 if afunc is not None:
236 raise TypeError(
237 "Func was provided as a coroutine function, but afunc was "
238 "also provided. If providing both, func should be a regular "
239 "function to avoid ambiguity."
240 )
241 self.afunc = run_helpers.ensure_traceable(
242 func, process_inputs=process_inputs
243 )
244 self._name = getattr(func, "__name__", "DynamicRunEvaluator")
245 else:
246 self.func = run_helpers.ensure_traceable(
247 cast(Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT], func),
248 process_inputs=process_inputs,
249 )
250 self._name = getattr(func, "__name__", "DynamicRunEvaluator")
252 def _coerce_evaluation_result(
253 self,
254 result: Union[EvaluationResult, dict],
255 source_run_id: uuid.UUID,
256 allow_no_key: bool = False,
257 ) -> EvaluationResult:
258 if isinstance(result, EvaluationResult):
259 if not result.source_run_id:
260 result.source_run_id = source_run_id
261 return result
262 try:
263 if not result:
264 raise ValueError(
265 "Expected an EvaluationResult object, or dict with a metric"
266 f" 'key' and optional 'score'; got empty result: {result}"
267 )
268 if "key" not in result and allow_no_key:
269 result["key"] = self._name
270 if all(k not in result for k in ("score", "value", "comment")):
271 raise ValueError(
272 "Expected an EvaluationResult object, or dict with a metric"
273 f" 'key' and optional 'score' or categorical 'value'; got {result}"
274 )
275 return EvaluationResult(**{"source_run_id": source_run_id, **result})
276 except ValidationError as e:
277 raise ValueError(
278 "Expected an EvaluationResult object, or dict with a metric"
279 f" 'key' and optional 'score'; got {result}"
280 ) from e
282 def _coerce_evaluation_results(
283 self,
284 results: Union[dict, EvaluationResults],
285 source_run_id: uuid.UUID,
286 ) -> Union[EvaluationResult, EvaluationResults]:
287 if "results" in results:
288 cp = results.copy()
289 cp["results"] = [
290 self._coerce_evaluation_result(r, source_run_id=source_run_id)
291 for r in results["results"]
292 ]
293 return EvaluationResults(**cp)
295 return self._coerce_evaluation_result(
296 cast(dict, results), source_run_id=source_run_id, allow_no_key=True
297 )
299 def _format_result(
300 self,
301 result: Union[
302 EvaluationResult, EvaluationResults, dict, str, int, bool, float, list
303 ],
304 source_run_id: uuid.UUID,
305 ) -> Union[EvaluationResult, EvaluationResults]:
306 if isinstance(result, EvaluationResult):
307 if not result.source_run_id:
308 result.source_run_id = source_run_id
309 return result
310 result = _format_evaluator_result(result)
311 return self._coerce_evaluation_results(result, source_run_id)
313 @property
314 def is_async(self) -> bool:
315 """Check if the evaluator function is asynchronous.
317 Returns:
318 bool: `True` if the evaluator function is asynchronous, `False` otherwise.
319 """
320 return hasattr(self, "afunc")
322 def evaluate_run(
323 self,
324 run: Run,
325 example: Optional[Example] = None,
326 evaluator_run_id: Optional[uuid.UUID] = None,
327 ) -> Union[EvaluationResult, EvaluationResults]:
328 """Evaluate a run using the wrapped function.
330 This method directly invokes the wrapped function with the provided arguments.
332 Args:
333 run (Run): The run to be evaluated.
334 example (Optional[Example]): An optional example to be used in the evaluation.
336 Returns:
337 Union[EvaluationResult, EvaluationResults]: The result of the evaluation.
338 """ # noqa: E501
339 if not hasattr(self, "func"):
340 running_loop = asyncio.get_event_loop()
341 if running_loop.is_running():
342 raise RuntimeError(
343 "Cannot call `evaluate_run` on an async run evaluator from"
344 " within an running event loop. Use `aevaluate_run` instead."
345 )
346 else:
347 return running_loop.run_until_complete(self.aevaluate_run(run, example))
348 if evaluator_run_id is None:
349 evaluator_run_id = uuid.uuid4()
350 metadata: dict[str, Any] = {"target_run_id": run.id}
351 if getattr(run, "session_id", None):
352 metadata["experiment"] = str(run.session_id)
353 result = self.func(
354 run,
355 example,
356 langsmith_extra={"run_id": evaluator_run_id, "metadata": metadata},
357 )
358 return self._format_result(result, evaluator_run_id)
360 async def aevaluate_run(
361 self,
362 run: Run,
363 example: Optional[Example] = None,
364 evaluator_run_id: Optional[uuid.UUID] = None,
365 ):
366 """Evaluate a run asynchronously using the wrapped async function.
368 This method directly invokes the wrapped async function with the
369 provided arguments.
371 Args:
372 run (Run): The run to be evaluated.
373 example (Optional[Example]): An optional example to be used
374 in the evaluation.
376 Returns:
377 Union[EvaluationResult, EvaluationResults]: The result of the evaluation.
378 """
379 if not hasattr(self, "afunc"):
380 return await super().aevaluate_run(run, example)
381 if evaluator_run_id is None:
382 evaluator_run_id = uuid.uuid4()
383 metadata: dict[str, Any] = {"target_run_id": run.id}
384 if getattr(run, "session_id", None):
385 metadata["experiment"] = str(run.session_id)
386 result = await self.afunc(
387 run,
388 example,
389 langsmith_extra={"run_id": evaluator_run_id, "metadata": metadata},
390 )
391 return self._format_result(result, evaluator_run_id)
393 def __call__(
394 self, run: Run, example: Optional[Example] = None
395 ) -> Union[EvaluationResult, EvaluationResults]:
396 """Make the evaluator callable, allowing it to be used like a function.
398 This method enables the evaluator instance to be called directly, forwarding the
399 call to `evaluate_run`.
401 Args:
402 run (Run): The run to be evaluated.
403 example (Optional[Example]): An optional example to be used in the evaluation.
405 Returns:
406 Union[EvaluationResult, EvaluationResults]: The result of the evaluation.
407 """ # noqa: E501
408 return self.evaluate_run(run, example)
410 def __repr__(self) -> str:
411 """Represent the DynamicRunEvaluator object."""
412 return f"<DynamicRunEvaluator {self._name}>"
415def run_evaluator(
416 func: Callable[
417 [Run, Optional[Example]], Union[_RUNNABLE_OUTPUT, Awaitable[_RUNNABLE_OUTPUT]]
418 ],
419):
420 """Create a run evaluator from a function.
422 Decorator that transforms a function into a `RunEvaluator`.
423 """
424 return DynamicRunEvaluator(func)
427_MAXSIZE = 10_000
430def _maxsize_repr(obj: Any):
431 s = repr(obj)
432 if len(s) > _MAXSIZE:
433 s = s[: _MAXSIZE - 4] + "...)"
434 return s
437class DynamicComparisonRunEvaluator:
438 """Compare predictions (as traces) from 2 or more runs."""
440 def __init__(
441 self,
442 func: Callable[
443 [Sequence[Run], Optional[Example]],
444 Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]],
445 ],
446 # Async function to be used for async evaluation. Optional
447 afunc: Optional[
448 Callable[
449 [Sequence[Run], Optional[Example]],
450 Awaitable[_COMPARISON_OUTPUT],
451 ]
452 ] = None,
453 ):
454 """Initialize the `DynamicRunEvaluator` with a given function.
456 Args:
457 func (Callable): A function that takes a `Run` and an optional `Example` as
458 arguments, and returns an `EvaluationResult` or `EvaluationResults`.
459 """
460 (func, prepare_inputs) = _normalize_comparison_evaluator_func(func)
461 if afunc:
462 (afunc, prepare_inputs) = _normalize_comparison_evaluator_func(afunc) # type: ignore[assignment]
464 def process_inputs(inputs: dict) -> dict:
465 if prepare_inputs is None:
466 return inputs
467 (_, _, traced_inputs) = prepare_inputs(
468 inputs.get("runs"), inputs.get("example")
469 )
470 return traced_inputs
472 wraps(func)(self)
473 from langsmith import run_helpers # type: ignore
475 if afunc is not None:
476 self.afunc = run_helpers.ensure_traceable(
477 afunc, process_inputs=process_inputs
478 )
479 self._name = getattr(afunc, "__name__", "DynamicRunEvaluator")
480 if inspect.iscoroutinefunction(func):
481 if afunc is not None:
482 raise TypeError(
483 "Func was provided as a coroutine function, but afunc was "
484 "also provided. If providing both, func should be a regular "
485 "function to avoid ambiguity."
486 )
487 self.afunc = run_helpers.ensure_traceable(
488 func, process_inputs=process_inputs
489 )
490 self._name = getattr(func, "__name__", "DynamicRunEvaluator")
491 else:
492 self.func = run_helpers.ensure_traceable(
493 cast(
494 Callable[
495 [Sequence[Run], Optional[Example]],
496 _COMPARISON_OUTPUT,
497 ],
498 func,
499 ),
500 process_inputs=process_inputs,
501 )
502 self._name = getattr(func, "__name__", "DynamicRunEvaluator")
504 @property
505 def is_async(self) -> bool:
506 """Check if the evaluator function is asynchronous.
508 Returns:
509 bool: `True` if the evaluator function is asynchronous, `False` otherwise.
510 """
511 return hasattr(self, "afunc")
513 def compare_runs(
514 self, runs: Sequence[Run], example: Optional[Example] = None
515 ) -> ComparisonEvaluationResult:
516 """Compare runs to score preferences.
518 Args:
519 runs: A list of runs to compare.
520 example: An optional example to be used in the evaluation.
522 """ # noqa: E501
523 if not hasattr(self, "func"):
524 running_loop = asyncio.get_event_loop()
525 if running_loop.is_running():
526 raise RuntimeError(
527 "Cannot call `evaluate_run` on an async run evaluator from"
528 " within an running event loop. Use `aevaluate_run` instead."
529 )
530 else:
531 return running_loop.run_until_complete(
532 self.acompare_runs(runs, example)
533 )
534 source_run_id = uuid.uuid4()
535 tags = self._get_tags(runs)
536 # TODO: Add metadata for the "comparison experiment" here
537 result = self.func(
538 runs,
539 example,
540 langsmith_extra={"run_id": source_run_id, "tags": tags},
541 )
542 return self._format_results(result, source_run_id, runs)
544 async def acompare_runs(
545 self, runs: Sequence[Run], example: Optional[Example] = None
546 ) -> ComparisonEvaluationResult:
547 """Evaluate a run asynchronously using the wrapped async function.
549 This method directly invokes the wrapped async function with the
550 provided arguments.
552 Args:
553 runs (Run): The runs to be evaluated.
554 example (Optional[Example]): An optional example to be used
555 in the evaluation.
557 Returns:
558 ComparisonEvaluationResult: The result of the evaluation.
559 """
560 if not hasattr(self, "afunc"):
561 return self.compare_runs(runs, example)
562 source_run_id = uuid.uuid4()
563 tags = self._get_tags(runs)
564 # TODO: Add metadata for the "comparison experiment" here
565 result = await self.afunc(
566 runs,
567 example,
568 langsmith_extra={"run_id": source_run_id, "tags": tags},
569 )
570 return self._format_results(result, source_run_id, runs)
572 def __call__(
573 self, runs: Sequence[Run], example: Optional[Example] = None
574 ) -> ComparisonEvaluationResult:
575 """Make the evaluator callable, allowing it to be used like a function.
577 This method enables the evaluator instance to be called directly, forwarding the
578 call to `evaluate_run`.
580 Args:
581 run (Run): The run to be evaluated.
582 example (Optional[Example]): An optional example to be used in the evaluation.
584 Returns:
585 ComparisonEvaluationResult: The result of the evaluation.
586 """ # noqa: E501
587 return self.compare_runs(runs, example)
589 def __repr__(self) -> str:
590 """Represent the DynamicRunEvaluator object."""
591 return f"<DynamicComparisonRunEvaluator {self._name}>"
593 @staticmethod
594 def _get_tags(runs: Sequence[Run]) -> list[str]:
595 """Extract tags from runs."""
596 # Add tags to support filtering
597 tags = []
598 for run in runs:
599 tags.append("run:" + str(run.id))
600 if getattr(run, "session_id", None):
601 tags.append("experiment:" + str(run.session_id))
602 return tags
604 def _format_results(
605 self,
606 result: Union[dict, list, ComparisonEvaluationResult],
607 source_run_id: uuid.UUID,
608 runs: Sequence[Run],
609 ) -> ComparisonEvaluationResult:
610 if isinstance(result, ComparisonEvaluationResult):
611 if not result.source_run_id:
612 result.source_run_id = source_run_id
613 return result
614 elif isinstance(result, list):
615 result = {
616 "scores": {run.id: score for run, score in zip(runs, result)},
617 "key": self._name,
618 "source_run_id": source_run_id,
619 }
620 elif isinstance(result, dict):
621 if "key" not in result:
622 result["key"] = self._name
623 else:
624 msg = (
625 "Expected 'dict', 'list' or 'ComparisonEvaluationResult' result "
626 f"object. Received: {result=}"
627 )
628 raise ValueError(msg)
629 try:
630 return ComparisonEvaluationResult(
631 **{"source_run_id": source_run_id, **result}
632 )
633 except ValidationError as e:
634 raise ValueError(
635 f"Expected a dictionary with a 'key' and dictionary of scores mapping"
636 "run IDs to numeric scores, or ComparisonEvaluationResult object,"
637 f" got {result}"
638 ) from e
641def comparison_evaluator(
642 func: Callable[
643 [Sequence[Run], Optional[Example]],
644 Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]],
645 ],
646) -> DynamicComparisonRunEvaluator:
647 """Create a comaprison evaluator from a function."""
648 return DynamicComparisonRunEvaluator(func)
651def _normalize_evaluator_func(
652 func: Callable,
653) -> tuple[
654 Union[
655 Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT],
656 Callable[[Run, Optional[Example]], Awaitable[_RUNNABLE_OUTPUT]],
657 ],
658 Optional[Callable[..., dict]],
659]:
660 supported_args = (
661 "run",
662 "example",
663 "inputs",
664 "outputs",
665 "reference_outputs",
666 "attachments",
667 )
668 sig = inspect.signature(func)
669 all_args = [pname for pname, p in sig.parameters.items() if p.kind != p.VAR_KEYWORD]
670 args_with_defaults = [
671 pname
672 for pname, p in sig.parameters.items()
673 if p.default is not inspect.Parameter.empty
674 ]
675 if not all_args or (
676 not all(
677 pname in supported_args or pname in args_with_defaults for pname in all_args
678 )
679 and len([a for a in all_args if a not in args_with_defaults]) != 2
680 ):
681 msg = (
682 f"Invalid evaluator function. Must have at least one "
683 f"argument. Supported arguments are {supported_args}. Please "
684 f"see https://docs.smith.langchain.com/evaluation/how_to_guides/evaluation/evaluate_llm_application#use-custom-evaluators"
685 # noqa: E501
686 )
687 raise ValueError(msg)
688 # For backwards compatibility we assume custom arg names are Run and Example
689 # types, respectively.
690 elif not all(
691 pname in supported_args or pname in args_with_defaults for pname in all_args
692 ) or all_args == [
693 "run",
694 "example",
695 ]:
696 return func, None
697 else:
698 if inspect.iscoroutinefunction(func):
700 def _prepare_inputs(
701 run: Run, example: Optional[Example]
702 ) -> tuple[list, dict, dict]:
703 arg_map = {
704 "run": run,
705 "example": example,
706 "inputs": example.inputs if example else {},
707 "outputs": run.outputs or {},
708 "attachments": example.attachments or {} if example else {},
709 "reference_outputs": example.outputs or {} if example else {},
710 }
711 kwargs = {}
712 args = []
713 traced_inputs = {}
714 for param_name, param in sig.parameters.items():
715 # Could have params with defaults that are not in the arg map
716 if param_name in arg_map:
717 if param.kind in (
718 param.POSITIONAL_OR_KEYWORD,
719 param.POSITIONAL_ONLY,
720 ):
721 args.append(arg_map[param_name])
722 else:
723 kwargs[param_name] = arg_map[param_name]
724 traced_inputs[param_name] = (
725 _maxsize_repr(arg_map[param_name])
726 if param_name in ("run", "example")
727 else arg_map[param_name]
728 )
729 return args, kwargs, traced_inputs
731 async def awrapper(
732 run: Run, example: Optional[Example]
733 ) -> _RUNNABLE_OUTPUT:
734 (args, kwargs, _) = _prepare_inputs(run, example)
735 return await func(*args, **kwargs)
737 awrapper.__name__ = (
738 getattr(func, "__name__")
739 if hasattr(func, "__name__")
740 else awrapper.__name__
741 )
742 return (awrapper, _prepare_inputs) # type: ignore[return-value]
744 else:
746 def _prepare_inputs(
747 run: Run, example: Optional[Example]
748 ) -> tuple[list, dict, dict]:
749 arg_map = {
750 "run": run,
751 "example": example,
752 "inputs": example.inputs if example else {},
753 "outputs": run.outputs or {},
754 "attachments": example.attachments or {} if example else {},
755 "reference_outputs": example.outputs or {} if example else {},
756 }
757 kwargs = {}
758 args = []
759 traced_inputs = {}
760 for param_name, param in sig.parameters.items():
761 # Could have params with defaults that are not in the arg map
762 if param_name in arg_map:
763 if param.kind in (
764 param.POSITIONAL_OR_KEYWORD,
765 param.POSITIONAL_ONLY,
766 ):
767 args.append(arg_map[param_name])
768 else:
769 kwargs[param_name] = arg_map[param_name]
770 traced_inputs[param_name] = (
771 _maxsize_repr(arg_map[param_name])
772 if param_name in ("run", "example")
773 else arg_map[param_name]
774 )
775 return args, kwargs, traced_inputs
777 def wrapper(run: Run, example: Optional[Example]) -> _RUNNABLE_OUTPUT:
778 (args, kwargs, _) = _prepare_inputs(run, example)
779 return func(*args, **kwargs)
781 wrapper.__name__ = (
782 getattr(func, "__name__")
783 if hasattr(func, "__name__")
784 else wrapper.__name__
785 )
786 return (wrapper, _prepare_inputs) # type: ignore[return-value]
789def _normalize_comparison_evaluator_func(
790 func: Callable,
791) -> tuple[
792 Union[
793 Callable[[Sequence[Run], Optional[Example]], _COMPARISON_OUTPUT],
794 Callable[[Sequence[Run], Optional[Example]], Awaitable[_COMPARISON_OUTPUT]],
795 ],
796 Optional[Callable[..., dict]],
797]:
798 supported_args = ("runs", "example", "inputs", "outputs", "reference_outputs")
799 sig = inspect.signature(func)
800 all_args = [pname for pname, p in sig.parameters.items() if p.kind != p.VAR_KEYWORD]
801 args_with_defaults = [
802 pname
803 for pname, p in sig.parameters.items()
804 if p.default is not inspect.Parameter.empty
805 ]
806 if not all_args or (
807 not all(
808 pname in supported_args or pname in args_with_defaults for pname in all_args
809 )
810 and len([a for a in all_args if a not in args_with_defaults]) != 2
811 ):
812 msg = (
813 f"Invalid evaluator function. Must have at least one "
814 f"argument. Supported arguments are {supported_args}. Please "
815 f"see https://docs.smith.langchain.com/evaluation/how_to_guides/evaluation/evaluate_llm_application#use-custom-evaluators"
816 # noqa: E501
817 )
818 raise ValueError(msg)
819 # For backwards compatibility we assume custom arg names are List[Run] and
820 # List[Example] types, respectively.
821 elif not all(
822 pname in supported_args or pname in args_with_defaults for pname in all_args
823 ) or all_args == [
824 "runs",
825 "example",
826 ]:
827 return func, None
828 else:
829 if inspect.iscoroutinefunction(func):
831 def _prepare_inputs(
832 runs: Sequence[Run], example: Optional[Example]
833 ) -> tuple[list, dict, dict]:
834 arg_map = {
835 "runs": runs,
836 "example": example,
837 "inputs": example.inputs if example else {},
838 "outputs": [run.outputs or {} for run in runs],
839 "reference_outputs": example.outputs or {} if example else {},
840 }
841 kwargs = {}
842 args = []
843 traced_inputs = {}
844 for param_name, param in sig.parameters.items():
845 # Could have params with defaults that are not in the arg map
846 if param_name in arg_map:
847 if param.kind in (
848 param.POSITIONAL_OR_KEYWORD,
849 param.POSITIONAL_ONLY,
850 ):
851 args.append(arg_map[param_name])
852 else:
853 kwargs[param_name] = arg_map[param_name]
854 traced_inputs[param_name] = (
855 _maxsize_repr(arg_map[param_name])
856 if param_name in ("runs", "example")
857 else arg_map[param_name]
858 )
859 return args, kwargs, traced_inputs
861 async def awrapper(
862 runs: Sequence[Run], example: Optional[Example]
863 ) -> _COMPARISON_OUTPUT:
864 (args, kwargs, _) = _prepare_inputs(runs, example)
865 return await func(*args, **kwargs)
867 awrapper.__name__ = (
868 getattr(func, "__name__")
869 if hasattr(func, "__name__")
870 else awrapper.__name__
871 )
872 return awrapper, _prepare_inputs # type: ignore[return-value]
874 else:
876 def _prepare_inputs(
877 runs: Sequence[Run], example: Optional[Example]
878 ) -> tuple[list, dict, dict]:
879 arg_map = {
880 "runs": runs,
881 "example": example,
882 "inputs": example.inputs if example else {},
883 "outputs": [run.outputs or {} for run in runs],
884 "reference_outputs": example.outputs or {} if example else {},
885 }
886 kwargs = {}
887 args = []
888 traced_inputs = {}
889 for param_name, param in sig.parameters.items():
890 # Could have params with defaults that are not in the arg map
891 if param_name in arg_map:
892 if param.kind in (
893 param.POSITIONAL_OR_KEYWORD,
894 param.POSITIONAL_ONLY,
895 ):
896 args.append(arg_map[param_name])
897 else:
898 kwargs[param_name] = arg_map[param_name]
899 traced_inputs[param_name] = (
900 _maxsize_repr(arg_map[param_name])
901 if param_name in ("runs", "example")
902 else arg_map[param_name]
903 )
904 return args, kwargs, traced_inputs
906 def wrapper(
907 runs: Sequence[Run], example: Optional[Example]
908 ) -> _COMPARISON_OUTPUT:
909 (args, kwargs, _) = _prepare_inputs(runs, example)
910 return func(*args, **kwargs)
912 wrapper.__name__ = (
913 getattr(func, "__name__")
914 if hasattr(func, "__name__")
915 else wrapper.__name__
916 )
917 return wrapper, _prepare_inputs # type: ignore[return-value]
920def _format_evaluator_result(
921 result: Union[EvaluationResults, dict, str, int, bool, float, list],
922) -> Union[EvaluationResults, dict]:
923 if isinstance(result, (bool, float, int)):
924 result = {"score": result}
925 elif not result:
926 raise ValueError(
927 f"Expected a non-empty dict, str, bool, int, float, list, "
928 f"EvaluationResult, or EvaluationResults. Got {result}"
929 )
930 elif isinstance(result, list):
931 if not all(isinstance(x, dict) for x in result):
932 raise ValueError(
933 f"Expected a list of dicts or EvaluationResults. Received {result}."
934 )
935 result = {"results": result} # type: ignore[misc]
936 elif isinstance(result, str):
937 result = {"value": result}
938 elif isinstance(result, dict):
939 pass
940 else:
941 raise ValueError(
942 f"Expected a dict, str, bool, int, float, list, EvaluationResult, or "
943 f"EvaluationResults. Got {result}"
944 )
945 return result
948SUMMARY_EVALUATOR_T = Union[
949 Callable[
950 [Sequence[schemas.Run], Sequence[schemas.Example]],
951 Union[EvaluationResult, EvaluationResults],
952 ],
953 Callable[
954 [list[schemas.Run], list[schemas.Example]],
955 Union[EvaluationResult, EvaluationResults],
956 ],
957]
960def _normalize_summary_evaluator(func: Callable) -> SUMMARY_EVALUATOR_T:
961 supported_args = ("runs", "examples", "inputs", "outputs", "reference_outputs")
962 sig = inspect.signature(func)
963 all_args = [pname for pname, p in sig.parameters.items()]
964 args_with_defaults = [
965 pname
966 for pname, p in sig.parameters.items()
967 if p.default is not inspect.Parameter.empty
968 ]
969 if not all_args or (
970 not all(
971 pname in supported_args or pname in args_with_defaults for pname in all_args
972 )
973 and len([a for a in all_args if a not in args_with_defaults]) != 2
974 ):
975 msg = (
976 f"Invalid evaluator function. Must have at least one "
977 f"argument. Supported arguments are {supported_args}."
978 )
979 if all_args:
980 msg += f" Received arguments {all_args}."
981 raise ValueError(msg)
982 # For backwards compatibility we assume custom arg names are Sequence[Run] and
983 # Sequence[Example] types, respectively.
984 elif not all(pname in supported_args for pname in all_args) or all_args == [
985 "runs",
986 "examples",
987 ]:
988 return func
989 else:
991 def wrapper(
992 runs: Sequence[schemas.Run], examples: Sequence[schemas.Example]
993 ) -> Union[EvaluationResult, EvaluationResults]:
994 arg_map = {
995 "runs": runs,
996 "examples": examples,
997 "inputs": [example.inputs for example in examples],
998 "outputs": [run.outputs or {} for run in runs],
999 "reference_outputs": [example.outputs or {} for example in examples],
1000 }
1001 kwargs = {}
1002 args = []
1003 for param_name, param in sig.parameters.items():
1004 # Could have params with defaults that are not in the arg map
1005 if param_name in arg_map:
1006 if param.kind in (
1007 param.POSITIONAL_OR_KEYWORD,
1008 param.POSITIONAL_ONLY,
1009 ):
1010 args.append(arg_map[param_name])
1011 else:
1012 kwargs[param_name] = arg_map[param_name]
1014 result = func(*args, **kwargs)
1015 if isinstance(result, EvaluationResult):
1016 return result
1017 return _format_evaluator_result(result) # type: ignore
1019 wrapper.__name__ = (
1020 getattr(func, "__name__") if hasattr(func, "__name__") else wrapper.__name__
1021 )
1022 return wrapper # type: ignore[return-value]