Coverage for langsmith/evaluation/_runner.py: 48%
770 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""V2 Evaluation Interface."""
3from __future__ import annotations
5import ast
6import collections
7import concurrent.futures as cf
8import functools
9import inspect
10import io
11import itertools
12import logging
13import pathlib
14import queue
15import random
16import textwrap
17import threading
18import uuid
19from collections.abc import Awaitable, Generator, Iterable, Iterator, Sequence
20from contextvars import copy_context
21from typing import (
22 TYPE_CHECKING,
23 Any,
24 Callable,
25 Literal,
26 Optional,
27 TypeVar,
28 Union,
29 cast,
30)
32from typing_extensions import TypedDict, overload
34import langsmith
35from langsmith import env as ls_env
36from langsmith import run_helpers as rh
37from langsmith import run_trees as rt
38from langsmith import schemas
39from langsmith import utils as ls_utils
40from langsmith._internal._beta_decorator import _warn_once
41from langsmith.evaluation.evaluator import (
42 SUMMARY_EVALUATOR_T,
43 ComparisonEvaluationResult,
44 DynamicComparisonRunEvaluator,
45 DynamicRunEvaluator,
46 EvaluationResult,
47 EvaluationResults,
48 RunEvaluator,
49 _normalize_summary_evaluator,
50 comparison_evaluator,
51 run_evaluator,
52)
53from langsmith.evaluation.integrations import LangChainStringEvaluator
55if TYPE_CHECKING:
56 import pandas as pd
57 from langchain_core.runnables import Runnable
59 DataFrame = pd.DataFrame
60else:
61 DataFrame = Any
62logger = logging.getLogger(__name__)
64TARGET_T = Union[Callable[[dict], dict], Callable[[dict, dict], dict]]
65# Data format: dataset-name, dataset_id, or examples
66DATA_T = Union[str, uuid.UUID, Iterable[schemas.Example], schemas.Dataset]
67# Summary evaluator runs over the whole dataset
68# and reports aggregate metric(s)
69# Row-level evaluator
70EVALUATOR_T = Union[
71 RunEvaluator,
72 Callable[
73 [schemas.Run, Optional[schemas.Example]],
74 Union[EvaluationResult, EvaluationResults],
75 ],
76 Callable[..., Union[dict, EvaluationResults, EvaluationResult]],
77]
78AEVALUATOR_T = Union[
79 Callable[
80 [schemas.Run, Optional[schemas.Example]],
81 Awaitable[Union[EvaluationResult, EvaluationResults]],
82 ],
83]
84EXPERIMENT_T = Union[str, uuid.UUID, schemas.TracerSession]
87@overload
88def evaluate(
89 target: Union[TARGET_T, Runnable, EXPERIMENT_T],
90 /,
91 data: Optional[DATA_T] = None,
92 evaluators: Optional[Sequence[EVALUATOR_T]] = None,
93 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
94 metadata: Optional[dict] = None,
95 experiment_prefix: Optional[str] = None,
96 description: Optional[str] = None,
97 max_concurrency: Optional[int] = 0,
98 num_repetitions: int = 1,
99 client: Optional[langsmith.Client] = None,
100 blocking: bool = True,
101 experiment: Optional[EXPERIMENT_T] = None,
102 upload_results: bool = True,
103 **kwargs: Any,
104) -> ExperimentResults: ...
107@overload
108def evaluate(
109 target: Union[tuple[EXPERIMENT_T, EXPERIMENT_T]],
110 /,
111 data: Optional[DATA_T] = None,
112 evaluators: Optional[Sequence[COMPARATIVE_EVALUATOR_T]] = None,
113 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
114 metadata: Optional[dict] = None,
115 experiment_prefix: Optional[str] = None,
116 description: Optional[str] = None,
117 max_concurrency: Optional[int] = 0,
118 num_repetitions: int = 1,
119 client: Optional[langsmith.Client] = None,
120 blocking: bool = True,
121 experiment: Optional[EXPERIMENT_T] = None,
122 upload_results: bool = True,
123 **kwargs: Any,
124) -> ComparativeExperimentResults: ...
127def evaluate(
128 target: Union[TARGET_T, Runnable, EXPERIMENT_T, tuple[EXPERIMENT_T, EXPERIMENT_T]],
129 /,
130 data: Optional[DATA_T] = None,
131 evaluators: Optional[
132 Union[Sequence[EVALUATOR_T], Sequence[COMPARATIVE_EVALUATOR_T]]
133 ] = None,
134 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
135 metadata: Optional[dict] = None,
136 experiment_prefix: Optional[str] = None,
137 description: Optional[str] = None,
138 max_concurrency: Optional[int] = 0,
139 num_repetitions: int = 1,
140 client: Optional[langsmith.Client] = None,
141 blocking: bool = True,
142 experiment: Optional[EXPERIMENT_T] = None,
143 upload_results: bool = True,
144 error_handling: Literal["log", "ignore"] = "log",
145 **kwargs: Any,
146) -> Union[ExperimentResults, ComparativeExperimentResults]:
147 r"""Evaluate a target system on a given dataset.
149 Args:
150 target (TARGET_T | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]):
151 The target system or experiment(s) to evaluate.
153 Can be a function that takes a dict and returns a `dict`, a langchain `Runnable`, an
154 existing experiment ID, or a two-tuple of experiment IDs.
155 data (DATA_T): The dataset to evaluate on.
157 Can be a dataset name, a list of examples, or a generator of examples.
158 evaluators (Sequence[EVALUATOR_T] | Sequence[COMPARATIVE_EVALUATOR_T] | None):
159 A list of evaluators to run on each example. The evaluator signature
160 depends on the target type.
161 summary_evaluators (Sequence[SUMMARY_EVALUATOR_T] | None): A list of summary
162 evaluators to run on the entire dataset.
164 Should not be specified if comparing two existing experiments.
165 metadata (dict | None): Metadata to attach to the experiment.
166 experiment_prefix (str | None): A prefix to provide for your experiment name.
167 description (str | None): A free-form text description for the experiment.
168 max_concurrency (int | None): The maximum number of concurrent
169 evaluations to run.
171 If `None` then no limit is set. If `0` then no concurrency.
172 client (langsmith.Client | None): The LangSmith client to use.
173 blocking (bool): Whether to block until the evaluation is complete.
174 num_repetitions (int): The number of times to run the evaluation.
175 Each item in the dataset will be run and evaluated this many times.
176 experiment (schemas.TracerSession | None): An existing experiment to
177 extend.
179 If provided, `experiment_prefix` is ignored.
181 For advanced usage only. Should not be specified if target is an existing
182 experiment or two-tuple fo experiments.
183 error_handling (str, default="log"): How to handle individual run errors.
185 `'log'` will trace the runs with the error message as part of the
186 experiment, `'ignore'` will not count the run as part of the experiment at
187 all.
189 Returns:
190 ExperimentResults: If target is a function, `Runnable`, or existing experiment.
191 ComparativeExperimentResults: If target is a two-tuple of existing experiments.
193 Examples:
194 Prepare the dataset:
196 >>> from typing import Sequence
197 >>> from langsmith import Client
198 >>> from langsmith.evaluation import evaluate
199 >>> from langsmith.schemas import Example, Run
200 >>> client = Client()
201 >>> dataset = client.clone_public_dataset(
202 ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
203 ... )
204 >>> dataset_name = "Evaluate Examples"
206 Basic usage:
208 >>> def accuracy(run: Run, example: Example):
209 ... # Row-level evaluator for accuracy.
210 ... pred = run.outputs["output"]
211 ... expected = example.outputs["answer"]
212 ... return {"score": expected.lower() == pred.lower()}
213 >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
214 ... # Experiment-level evaluator for precision.
215 ... # TP / (TP + FP)
216 ... predictions = [run.outputs["output"].lower() for run in runs]
217 ... expected = [example.outputs["answer"].lower() for example in examples]
218 ... # yes and no are the only possible answers
219 ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
220 ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
221 ... return {"score": tp / (tp + fp)}
222 >>> def predict(inputs: dict) -> dict:
223 ... # This can be any function or just an API call to your app.
224 ... return {"output": "Yes"}
225 >>> results = evaluate(
226 ... predict,
227 ... data=dataset_name,
228 ... evaluators=[accuracy],
229 ... summary_evaluators=[precision],
230 ... experiment_prefix="My Experiment",
231 ... description="Evaluating the accuracy of a simple prediction model.",
232 ... metadata={
233 ... "my-prompt-version": "abcd-1234",
234 ... },
235 ... ) # doctest: +ELLIPSIS
236 View the evaluation results for experiment:...
238 Evaluating over only a subset of the examples
240 >>> experiment_name = results.experiment_name
241 >>> examples = client.list_examples(dataset_name=dataset_name, limit=5)
242 >>> results = evaluate(
243 ... predict,
244 ... data=examples,
245 ... evaluators=[accuracy],
246 ... summary_evaluators=[precision],
247 ... experiment_prefix="My Experiment",
248 ... description="Just testing a subset synchronously.",
249 ... ) # doctest: +ELLIPSIS
250 View the evaluation results for experiment:...
252 Streaming each prediction to more easily + eagerly debug.
254 >>> results = evaluate(
255 ... predict,
256 ... data=dataset_name,
257 ... evaluators=[accuracy],
258 ... summary_evaluators=[precision],
259 ... description="I don't even have to block!",
260 ... blocking=False,
261 ... ) # doctest: +ELLIPSIS
262 View the evaluation results for experiment:...
263 >>> for i, result in enumerate(results): # doctest: +ELLIPSIS
264 ... pass
266 Using the `evaluate` API with an off-the-shelf LangChain evaluator:
268 >>> from langsmith.evaluation import LangChainStringEvaluator # doctest: +SKIP
269 >>> from langchain_openai import ChatOpenAI # doctest: +SKIP
270 >>> def prepare_criteria_data(run: Run, example: Example): # doctest: +SKIP
271 ... return {
272 ... "prediction": run.outputs["output"],
273 ... "reference": example.outputs["answer"],
274 ... "input": str(example.inputs),
275 ... }
276 >>> results = evaluate( # doctest: +SKIP
277 ... predict,
278 ... data=dataset_name,
279 ... evaluators=[
280 ... accuracy,
281 ... LangChainStringEvaluator("embedding_distance"),
282 ... LangChainStringEvaluator(
283 ... "labeled_criteria",
284 ... config={
285 ... "criteria": {
286 ... "usefulness": "The prediction is useful if it is correct"
287 ... " and/or asks a useful followup question."
288 ... },
289 ... "llm": ChatOpenAI(model="gpt-4o"),
290 ... },
291 ... prepare_data=prepare_criteria_data,
292 ... ),
293 ... ],
294 ... description="Evaluating with off-the-shelf LangChain evaluators.",
295 ... summary_evaluators=[precision],
296 ... )
297 View the evaluation results for experiment:... # doctest: +SKIP
299 Evaluating a LangChain object:
301 >>> from langchain_core.runnables import chain as as_runnable
302 >>> @as_runnable
303 ... def nested_predict(inputs):
304 ... return {"output": "Yes"}
305 >>> @as_runnable
306 ... def lc_predict(inputs):
307 ... return nested_predict.invoke(inputs)
308 >>> results = evaluate(
309 ... lc_predict.invoke,
310 ... data=dataset_name,
311 ... evaluators=[accuracy],
312 ... description="This time we're evaluating a LangChain object.",
313 ... summary_evaluators=[precision],
314 ... ) # doctest: +ELLIPSIS
315 View the evaluation results for experiment:...
317 !!! warning "Behavior changed in `langsmith` 0.2.0"
319 'max_concurrency' default updated from None (no limit on concurrency)
320 to 0 (no concurrency at all).
321 """ # noqa: E501
322 if isinstance(target, (str, uuid.UUID, schemas.TracerSession)):
323 invalid_args = {
324 "num_repetitions": num_repetitions > 1,
325 "experiment": bool(experiment),
326 "upload_results": not upload_results,
327 "experiment_prefix": bool(experiment_prefix),
328 "data": bool(data),
329 }
330 if any(invalid_args.values()):
331 msg = (
332 f"Received invalid arguments. "
333 f"{tuple(k for k, v in invalid_args.items() if v)} should not be "
334 f"specified when target is an existing experiment."
335 )
336 raise ValueError(msg)
337 target_id = target if isinstance(target, (str, uuid.UUID)) else target.id
338 logger.debug(f"Running evaluation over existing experiment {target_id}...")
339 return evaluate_existing(
340 target,
341 evaluators=cast(Optional[Sequence[EVALUATOR_T]], evaluators),
342 summary_evaluators=summary_evaluators,
343 metadata=metadata,
344 max_concurrency=max_concurrency,
345 client=client,
346 blocking=blocking,
347 **kwargs,
348 )
349 elif isinstance(target, (list, tuple)):
350 invalid_args = {
351 "num_repetitions": num_repetitions > 1,
352 "experiment": bool(experiment),
353 "upload_results": not upload_results,
354 "summary_evaluators": bool(summary_evaluators),
355 "data": bool(data),
356 }
357 if len(target) != 2 or not all(
358 isinstance(t, (str, uuid.UUID, schemas.TracerSession)) for t in target
359 ):
360 msg = (
361 "Received invalid target. If a tuple is specified it must have length "
362 "2 and each element should by the ID or schemas.TracerSession of an "
363 f"existing experiment. Received {target=}"
364 )
365 raise ValueError(msg)
366 elif any(invalid_args.values()):
367 msg = (
368 f"Received invalid arguments. "
369 f"{tuple(k for k, v in invalid_args.items() if v)} should not be "
370 f"specified when target is two existing experiments."
371 )
372 raise ValueError(msg)
373 if max_concurrency is not None:
374 kwargs["max_concurrency"] = max_concurrency
375 target_ids = [t if isinstance(t, (str, uuid.UUID)) else t.id for t in target]
376 logger.debug(
377 f"Running pairwise evaluation over existing experiments {target_ids}..."
378 )
379 return evaluate_comparative(
380 target,
381 evaluators=cast(Sequence[COMPARATIVE_EVALUATOR_T], evaluators or ()),
382 experiment_prefix=experiment_prefix,
383 description=description,
384 client=client,
385 metadata=metadata,
386 **kwargs,
387 )
388 elif kwargs:
389 msg = (
390 f"Received unsupported arguments {kwargs}. These arguments are not "
391 f"supported when creating a new experiment."
392 )
393 raise ValueError(msg)
394 elif not data:
395 msg = "Must specify 'data' when running evaluations over a target function."
396 raise ValueError(msg)
397 elif callable(target) and rh.is_async(target):
398 msg = (
399 "Async functions are not supported by `evaluate`. "
400 "Please use `aevaluate` instead:\n\n"
401 "from langsmith import aevaluate\n\n"
402 "await aevaluate(\n"
403 " async_target_function,\n"
404 " data=data,\n"
405 " evaluators=evaluators,\n"
406 " # ... other parameters\n"
407 ")"
408 )
409 raise ValueError(msg)
410 elif experiment and experiment_prefix:
411 msg = (
412 "Expected at most one of 'experiment' or 'experiment_prefix',"
413 " but both were provided. "
414 f"Got: experiment={experiment}, experiment_prefix={experiment_prefix}"
415 )
416 raise ValueError(msg)
417 else:
418 if not upload_results:
419 _warn_once("'upload_results' parameter is in beta.")
420 logger.debug(f"Running evaluation over target system {target}...")
421 return _evaluate(
422 target,
423 data=data,
424 evaluators=cast(Optional[Sequence[EVALUATOR_T]], evaluators),
425 summary_evaluators=summary_evaluators,
426 metadata=metadata,
427 experiment_prefix=experiment_prefix,
428 description=description,
429 max_concurrency=max_concurrency,
430 num_repetitions=num_repetitions,
431 client=client,
432 blocking=blocking,
433 experiment=experiment,
434 upload_results=upload_results,
435 error_handling=error_handling,
436 )
439def evaluate_existing(
440 experiment: Union[str, uuid.UUID, schemas.TracerSession],
441 /,
442 evaluators: Optional[Sequence[EVALUATOR_T]] = None,
443 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
444 metadata: Optional[dict] = None,
445 max_concurrency: Optional[int] = 0,
446 client: Optional[langsmith.Client] = None,
447 load_nested: bool = False,
448 blocking: bool = True,
449) -> ExperimentResults:
450 r"""Evaluate existing experiment runs.
452 Args:
453 experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate.
454 evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation.
455 summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators
456 to apply over the entire dataset.
457 metadata (Optional[dict]): Optional metadata to include in the evaluation results.
458 max_concurrency (int | None): The maximum number of concurrent
459 evaluations to run.
461 If `None` then no limit is set. If `0` then no concurrency.
462 client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation.
463 load_nested: Whether to load all child runs for the experiment.
465 Default is to only load the top-level root runs.
466 blocking (bool): Whether to block until evaluation is complete.
468 Returns:
469 The evaluation results.
471 Environment:
472 - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to save time and
473 cost during testing.
475 Recommended to commit the cache files to your repository for faster CI/CD runs.
477 Requires the `'langsmith[vcr]'` package to be installed.
479 Examples:
480 Define your evaluators
482 >>> from typing import Sequence
483 >>> from langsmith.schemas import Example, Run
484 >>> def accuracy(run: Run, example: Example):
485 ... # Row-level evaluator for accuracy.
486 ... pred = run.outputs["output"]
487 ... expected = example.outputs["answer"]
488 ... return {"score": expected.lower() == pred.lower()}
489 >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
490 ... # Experiment-level evaluator for precision.
491 ... # TP / (TP + FP)
492 ... predictions = [run.outputs["output"].lower() for run in runs]
493 ... expected = [example.outputs["answer"].lower() for example in examples]
494 ... # yes and no are the only possible answers
495 ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
496 ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
497 ... return {"score": tp / (tp + fp)}
499 Load the experiment and run the evaluation.
501 >>> import uuid
502 >>> from langsmith import Client
503 >>> from langsmith.evaluation import evaluate, evaluate_existing
504 >>> client = Client()
505 >>> dataset_name = "__doctest_evaluate_existing_" + uuid.uuid4().hex[:8]
506 >>> dataset = client.create_dataset(dataset_name)
507 >>> example = client.create_example(
508 ... inputs={"question": "What is 2+2?"},
509 ... outputs={"answer": "4"},
510 ... dataset_id=dataset.id,
511 ... )
512 >>> def predict(inputs: dict) -> dict:
513 ... return {"output": "4"}
514 >>> # First run inference on the dataset
515 ... results = evaluate(
516 ... predict, data=dataset_name, experiment_prefix="doctest_experiment"
517 ... ) # doctest: +ELLIPSIS
518 View the evaluation results for experiment:...
519 >>> experiment_id = results.experiment_name
520 >>> # Wait for the experiment to be fully processed and check if we have results
521 >>> len(results) > 0
522 True
523 >>> import time
524 >>> time.sleep(2)
525 >>> results = evaluate_existing(
526 ... experiment_id,
527 ... evaluators=[accuracy],
528 ... summary_evaluators=[precision],
529 ... ) # doctest: +ELLIPSIS
530 View the evaluation results for experiment:...
531 >>> client.delete_dataset(dataset_id=dataset.id)
532 """ # noqa: E501
533 client = client or rt.get_cached_client(timeout_ms=(20_000, 90_001))
534 project = _load_experiment(experiment, client)
535 runs = _load_traces(experiment, client, load_nested=load_nested)
536 data_map = _load_examples_map(client, project)
537 data = [data_map[cast(uuid.UUID, run.reference_example_id)] for run in runs]
538 return _evaluate(
539 runs,
540 data=data,
541 evaluators=evaluators,
542 summary_evaluators=summary_evaluators,
543 metadata=metadata,
544 max_concurrency=max_concurrency,
545 client=client,
546 blocking=blocking,
547 experiment=project,
548 )
551class ExperimentResultRow(TypedDict):
552 run: schemas.Run
553 example: schemas.Example
554 evaluation_results: EvaluationResults
557class ExperimentResults:
558 """Represents the results of an evaluate() call.
560 This class provides an iterator interface to iterate over the experiment results
561 as they become available. It also provides methods to access the experiment name,
562 the number of results, and to wait for the results to be processed.
564 Methods:
565 experiment_name() -> str: Returns the name of the experiment.
566 wait() -> None: Waits for the experiment data to be processed.
567 """
569 def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True):
570 self._manager = experiment_manager
571 self._results: list[ExperimentResultRow] = []
572 self._queue: queue.Queue[ExperimentResultRow] = queue.Queue()
573 self._processing_complete = threading.Event()
574 if not blocking:
575 self._thread: Optional[threading.Thread] = threading.Thread(
576 target=self._process_data
577 )
578 self._thread.start()
579 else:
580 self._thread = None
581 self._process_data()
583 @property
584 def experiment_name(self) -> str:
585 return self._manager.experiment_name
587 def __iter__(self) -> Iterator[ExperimentResultRow]:
588 ix = 0
589 while (
590 not self._processing_complete.is_set()
591 or not self._queue.empty()
592 or ix < len(self._results)
593 ):
594 try:
595 if ix < len(self._results):
596 yield self._results[ix]
597 ix += 1
598 else:
599 self._queue.get(block=True, timeout=0.1)
600 except queue.Empty:
601 continue
603 def _process_data(self) -> None:
604 tqdm = _load_tqdm()
605 results = self._manager.get_results()
606 for item in tqdm(results):
607 self._queue.put(item)
608 self._results.append(item)
610 summary_scores = self._manager.get_summary_scores()
611 self._summary_results = summary_scores
613 self._processing_complete.set()
615 def __len__(self) -> int:
616 return len(self._results)
618 def to_pandas(
619 self, start: Optional[int] = 0, end: Optional[int] = None
620 ) -> DataFrame:
621 return _to_pandas(self._results, start=start, end=end)
623 def _repr_html_(self) -> str:
624 import importlib.util
626 if self._results and importlib.util.find_spec("pandas"):
627 df = self.to_pandas()
628 return df._repr_html_() # type: ignore[operator]
629 else:
630 return self.__repr__()
632 def __repr__(self) -> str:
633 return f"<ExperimentResults {self.experiment_name}>"
635 def wait(self) -> None:
636 """Wait for the evaluation runner to complete.
638 This method blocks the current thread until the evaluation runner has
639 finished its execution.
640 """
641 if self._thread:
642 self._thread.join()
645## Public API for Comparison Experiments
647# Row-level evaluator
648COMPARATIVE_EVALUATOR_T = Callable[
649 [Sequence[schemas.Run], Optional[schemas.Example]],
650 Union[
651 Union[ComparisonEvaluationResult, dict],
652 Awaitable[Union[ComparisonEvaluationResult, dict]],
653 ],
654]
657def evaluate_comparative(
658 experiments: tuple[EXPERIMENT_T, EXPERIMENT_T],
659 /,
660 evaluators: Sequence[COMPARATIVE_EVALUATOR_T],
661 experiment_prefix: Optional[str] = None,
662 description: Optional[str] = None,
663 max_concurrency: int = 5,
664 client: Optional[langsmith.Client] = None,
665 metadata: Optional[dict] = None,
666 load_nested: bool = False,
667 randomize_order: bool = False,
668) -> ComparativeExperimentResults:
669 r"""Evaluate existing experiment runs against each other.
671 This lets you use pairwise preference scoring to generate more
672 reliable feedback in your experiments.
674 Args:
675 experiments (Tuple[Union[str, uuid.UUID], Union[str, uuid.UUID]]):
676 The identifiers of the experiments to compare.
677 evaluators (Sequence[COMPARATIVE_EVALUATOR_T]):
678 A list of evaluators to run on each example.
679 experiment_prefix (Optional[str]): A prefix to provide for your experiment name.
680 description (Optional[str]): A free-form text description for the experiment.
681 max_concurrency (int): The maximum number of concurrent evaluations to run.
682 client (Optional[langsmith.Client]): The LangSmith client to use.
683 metadata (Optional[dict]): Metadata to attach to the experiment.
684 load_nested (bool): Whether to load all child runs for the experiment.
686 Default is to only load the top-level root runs.
687 randomize_order (bool): Whether to randomize the order of the outputs for each evaluation.
689 Returns:
690 The results of the comparative evaluation.
692 Examples:
693 Suppose you want to compare two prompts to see which one is more effective.
694 You would first prepare your dataset:
696 >>> from typing import Sequence
697 >>> from langsmith import Client
698 >>> from langsmith.evaluation import evaluate
699 >>> from langsmith.schemas import Example, Run
700 >>> client = Client()
701 >>> dataset = client.clone_public_dataset(
702 ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
703 ... )
704 >>> dataset_name = "Evaluate Examples"
706 Then you would run your different prompts:
707 >>> import functools
708 >>> import openai
709 >>> from langsmith.evaluation import evaluate
710 >>> from langsmith.wrappers import wrap_openai
711 >>> oai_client = openai.Client()
712 >>> wrapped_client = wrap_openai(oai_client)
713 >>> prompt_1 = "You are a helpful assistant."
714 >>> prompt_2 = "You are an exceedingly helpful assistant."
715 >>> def predict(inputs: dict, prompt: str) -> dict:
716 ... completion = wrapped_client.chat.completions.create(
717 ... model="gpt-4o-mini",
718 ... messages=[
719 ... {"role": "system", "content": prompt},
720 ... {
721 ... "role": "user",
722 ... "content": f"Context: {inputs['context']}"
723 ... f"\n\ninputs['question']",
724 ... },
725 ... ],
726 ... )
727 ... return {"output": completion.choices[0].message.content}
728 >>> results_1 = evaluate(
729 ... functools.partial(predict, prompt=prompt_1),
730 ... data=dataset_name,
731 ... description="Evaluating our basic system prompt.",
732 ... blocking=False, # Run these experiments in parallel
733 ... ) # doctest: +ELLIPSIS
734 View the evaluation results for experiment:...
735 >>> results_2 = evaluate(
736 ... functools.partial(predict, prompt=prompt_2),
737 ... data=dataset_name,
738 ... description="Evaluating our advanced system prompt.",
739 ... blocking=False,
740 ... ) # doctest: +ELLIPSIS
741 View the evaluation results for experiment:...
742 >>> results_1.wait()
743 >>> results_2.wait()
745 Finally, you would compare the two prompts directly:
746 >>> import json
747 >>> from langsmith.evaluation import evaluate_comparative
748 >>> from langsmith import schemas
749 >>> def score_preferences(runs: list, example: schemas.Example):
750 ... assert len(runs) == 2 # Comparing 2 systems
751 ... assert isinstance(example, schemas.Example)
752 ... assert all(run.reference_example_id == example.id for run in runs)
753 ... pred_a = runs[0].outputs["output"] if runs[0].outputs else ""
754 ... pred_b = runs[1].outputs["output"] if runs[1].outputs else ""
755 ... ground_truth = example.outputs["answer"] if example.outputs else ""
756 ... tools = [
757 ... {
758 ... "type": "function",
759 ... "function": {
760 ... "name": "rank_preferences",
761 ... "description": "Saves the prefered response ('A' or 'B')",
762 ... "parameters": {
763 ... "type": "object",
764 ... "properties": {
765 ... "reasoning": {
766 ... "type": "string",
767 ... "description": "The reasoning behind the choice.",
768 ... },
769 ... "preferred_option": {
770 ... "type": "string",
771 ... "enum": ["A", "B"],
772 ... "description": "The preferred option, either 'A' or 'B'",
773 ... },
774 ... },
775 ... "required": ["preferred_option"],
776 ... },
777 ... },
778 ... }
779 ... ]
780 ... completion = openai.Client().chat.completions.create(
781 ... model="gpt-4o-mini",
782 ... messages=[
783 ... {"role": "system", "content": "Select the better response."},
784 ... {
785 ... "role": "user",
786 ... "content": f"Option A: {pred_a}"
787 ... f"\n\nOption B: {pred_b}"
788 ... f"\n\nGround Truth: {ground_truth}",
789 ... },
790 ... ],
791 ... tools=tools,
792 ... tool_choice={
793 ... "type": "function",
794 ... "function": {"name": "rank_preferences"},
795 ... },
796 ... )
797 ... tool_args = completion.choices[0].message.tool_calls[0].function.arguments
798 ... loaded_args = json.loads(tool_args)
799 ... preference = loaded_args["preferred_option"]
800 ... comment = loaded_args["reasoning"]
801 ... if preference == "A":
802 ... return {
803 ... "key": "ranked_preference",
804 ... "scores": {runs[0].id: 1, runs[1].id: 0},
805 ... "comment": comment,
806 ... }
807 ... else:
808 ... return {
809 ... "key": "ranked_preference",
810 ... "scores": {runs[0].id: 0, runs[1].id: 1},
811 ... "comment": comment,
812 ... }
813 >>> def score_length_difference(runs: list, example: schemas.Example):
814 ... # Just return whichever response is longer.
815 ... # Just an example, not actually useful in real life.
816 ... assert len(runs) == 2 # Comparing 2 systems
817 ... assert isinstance(example, schemas.Example)
818 ... assert all(run.reference_example_id == example.id for run in runs)
819 ... pred_a = runs[0].outputs["output"] if runs[0].outputs else ""
820 ... pred_b = runs[1].outputs["output"] if runs[1].outputs else ""
821 ... if len(pred_a) > len(pred_b):
822 ... return {
823 ... "key": "length_difference",
824 ... "scores": {runs[0].id: 1, runs[1].id: 0},
825 ... }
826 ... else:
827 ... return {
828 ... "key": "length_difference",
829 ... "scores": {runs[0].id: 0, runs[1].id: 1},
830 ... }
831 >>> results = evaluate_comparative(
832 ... [results_1.experiment_name, results_2.experiment_name],
833 ... evaluators=[score_preferences, score_length_difference],
834 ... client=client,
835 ... ) # doctest: +ELLIPSIS
836 View the pairwise evaluation results at:...
837 >>> eval_results = list(results)
838 >>> assert len(eval_results) >= 10 # doctest: +SKIP
839 >>> assert all(
840 ... "feedback.ranked_preference" in r["evaluation_results"]
841 ... for r in eval_results
842 ... ) # doctest: +SKIP
843 >>> assert all(
844 ... "feedback.length_difference" in r["evaluation_results"]
845 ... for r in eval_results
846 ... ) # doctest: +SKIP
847 """ # noqa: E501
848 if len(experiments) < 2:
849 raise ValueError("Comparative evaluation requires at least 2 experiments.")
850 if not evaluators:
851 raise ValueError(
852 "At least one evaluator is required for comparative evaluation."
853 )
854 if max_concurrency < 0:
855 raise ValueError("max_concurrency must be a positive integer.")
856 client = client or rt.get_cached_client()
858 # TODO: Add information about comparison experiments
859 projects = [_load_experiment(experiment, client) for experiment in experiments]
860 ref_datasets_ = [str(p.reference_dataset_id) for p in projects]
861 if not len(set(ref_datasets_)) == 1:
862 raise ValueError("All experiments must have the same reference dataset.")
863 experiment_ids = [p.id for p in projects]
864 if experiment_prefix is None:
865 experiment_names = [p.name for p in projects if p.name is not None]
866 experiment_name = (
867 " vs. ".join(experiment_names) + "-" + str(uuid.uuid4().hex[:4])
868 )
869 else:
870 experiment_name = experiment_prefix + "-" + str(uuid.uuid4().hex[:8])
871 comparative_experiment_id = uuid.uuid4()
872 comparative_experiment = client.create_comparative_experiment(
873 experiment_name,
874 experiments=experiment_ids,
875 description=description,
876 metadata=metadata,
877 id=comparative_experiment_id,
878 )
879 _print_comparative_experiment_start(
880 cast(
881 tuple[schemas.TracerSessionResult, schemas.TracerSessionResult],
882 tuple(projects),
883 ),
884 comparative_experiment,
885 )
886 runs = [
887 _load_traces(experiment, client, load_nested=load_nested)
888 for experiment in experiments
889 ]
890 # Only check intersections for the experiments
891 examples_intersection = None
892 for runs_list in runs:
893 example_ids_set = {run.reference_example_id for run in runs_list}
894 if examples_intersection is None:
895 examples_intersection = example_ids_set
896 else:
897 examples_intersection &= example_ids_set
898 example_ids_nullable = (
899 list(examples_intersection) if examples_intersection is not None else []
900 )
901 example_ids = [eid for eid in example_ids_nullable if eid is not None]
902 # TODO: Warn if different dataset versions, etc. are used in the different
903 # experiments. We aren't providing any training wheels here.
904 batch_size = 99
905 data = {}
906 for i in range(0, len(example_ids), batch_size):
907 example_ids_batch = example_ids[i : i + batch_size]
908 for e in client.list_examples(
909 dataset_id=projects[0].reference_dataset_id,
910 as_of=projects[0].metadata.get("dataset_version"),
911 example_ids=example_ids_batch,
912 ):
913 data[e.id] = e
914 runs_dict: dict[uuid.UUID, list[schemas.Run]] = collections.defaultdict(list)
915 for runs_list in runs:
916 for run in runs_list:
917 if run.reference_example_id in data:
918 runs_dict[cast(uuid.UUID, run.reference_example_id)].append(run)
920 comparators = [comparison_evaluator(evaluator) for evaluator in evaluators or []]
921 results: dict = {}
923 def evaluate_and_submit_feedback(
924 runs_list: list[schemas.Run],
925 example: schemas.Example,
926 comparator: DynamicComparisonRunEvaluator,
927 executor: cf.Executor,
928 ) -> tuple[uuid.UUID, ComparisonEvaluationResult]:
929 feedback_group_id = uuid.uuid4()
930 if randomize_order:
931 random.shuffle(runs_list)
932 with rh.tracing_context(project_name="evaluators", client=client):
933 result = comparator.compare_runs(runs_list, example)
934 if client is None:
935 raise ValueError("Client is required to submit feedback.")
936 comments = (
937 {str(rid): result.comment for rid in result.scores}
938 if isinstance(result.comment, str)
939 else (result.comment or {})
940 )
941 for run_id, score in result.scores.items():
942 executor.submit(
943 client.create_feedback,
944 run_id=run_id,
945 key=result.key,
946 score=score,
947 comment=comments.get(str(run_id)),
948 comparative_experiment_id=comparative_experiment.id,
949 source_run_id=result.source_run_id,
950 feedback_group_id=feedback_group_id,
951 )
952 return example.id, result
954 tqdm = _load_tqdm()
955 with ls_utils.ContextThreadPoolExecutor(
956 max_workers=max_concurrency or 1
957 ) as executor:
958 futures = []
959 for example_id, runs_list in tqdm(runs_dict.items()):
960 results[example_id] = {"runs": runs_list}
961 for comparator in comparators:
962 if max_concurrency > 1:
963 future = executor.submit(
964 evaluate_and_submit_feedback,
965 runs_list,
966 data[example_id],
967 comparator,
968 executor,
969 )
970 futures.append(future)
971 else:
972 _, result = evaluate_and_submit_feedback(
973 runs_list, data[example_id], comparator, executor
974 )
975 results[example_id][f"feedback.{result.key}"] = result
976 if futures:
977 cf.wait(futures)
978 for future in futures:
979 example_id, result = future.result()
980 results[example_id][f"feedback.{result.key}"] = result
982 return ComparativeExperimentResults(results, data)
985class ComparativeExperimentResults:
986 """Represents the results of an evaluate_comparative() call.
988 This class provides an iterator interface to iterate over the experiment results
989 as they become available. It also provides methods to access the experiment name,
990 the number of results, and to wait for the results to be processed.
992 Methods:
993 experiment_name() -> str: Returns the name of the experiment.
994 wait() -> None: Waits for the experiment data to be processed.
995 """
997 def __init__(
998 self,
999 results: dict,
1000 examples: Optional[dict[uuid.UUID, schemas.Example]] = None,
1001 ):
1002 self._results = results
1003 self._examples = examples
1005 def __getitem__(self, key):
1006 """Return the result associated with the given key."""
1007 return self._results[key]
1009 def __iter__(self):
1010 for key, value in self._results.items():
1011 yield {
1012 "example": self._examples[key] if self._examples else None,
1013 "evaluation_results": value,
1014 }
1017## Private API
1020def _print_comparative_experiment_start(
1021 experiments: tuple[schemas.TracerSession, schemas.TracerSession],
1022 comparative_experiment: schemas.ComparativeExperiment,
1023) -> None:
1024 url = experiments[0].url or experiments[1].url
1025 if url:
1026 project_url = url.split("?")[0]
1027 dataset_id = comparative_experiment.reference_dataset_id
1028 base_url = project_url.split("/projects/p/")[0]
1029 comparison_url = (
1030 f"{base_url}/datasets/{dataset_id}/compare?"
1031 f"selectedSessions={'%2C'.join([str(e.id) for e in experiments])}"
1032 f"&comparativeExperiment={comparative_experiment.id}"
1033 )
1034 print( # noqa: T201
1035 f"View the pairwise evaluation results at:\n{comparison_url}\n\n"
1036 )
1039def _is_callable(target: Union[TARGET_T, Iterable[schemas.Run], Runnable]) -> bool:
1040 return callable(target) or _is_langchain_runnable(target)
1043def _evaluate(
1044 target: Union[TARGET_T, Iterable[schemas.Run], Runnable],
1045 /,
1046 data: DATA_T,
1047 evaluators: Optional[Sequence[EVALUATOR_T]] = None,
1048 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
1049 metadata: Optional[dict] = None,
1050 experiment_prefix: Optional[str] = None,
1051 description: Optional[str] = None,
1052 max_concurrency: Optional[int] = None,
1053 num_repetitions: int = 1,
1054 client: Optional[langsmith.Client] = None,
1055 blocking: bool = True,
1056 experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
1057 upload_results: bool = True,
1058 error_handling: Literal["log", "ignore"] = "log",
1059) -> ExperimentResults:
1060 # Initialize the experiment manager.
1061 client = client or rt.get_cached_client()
1062 runs = None if _is_callable(target) else cast(Iterable[schemas.Run], target)
1063 experiment_, runs = _resolve_experiment(experiment, runs, client)
1065 manager = _ExperimentManager(
1066 data,
1067 client=client,
1068 metadata=metadata,
1069 experiment=experiment_ or experiment_prefix,
1070 description=description,
1071 num_repetitions=num_repetitions,
1072 # If provided, we don't need to create a new experiment.
1073 runs=runs,
1074 # Create or resolve the experiment.
1075 include_attachments=_include_attachments(target, evaluators),
1076 upload_results=upload_results,
1077 error_handling=error_handling,
1078 ).start()
1079 if cache_dir := ls_utils.get_cache_dir(None):
1080 cache_path = pathlib.Path(cache_dir) / f"{manager.dataset_id}.yaml"
1081 else:
1082 cache_path = None
1083 with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
1084 if _is_callable(target):
1085 # Add predictions to the experiment.
1086 manager = manager.with_predictions(
1087 cast(TARGET_T, target), max_concurrency=max_concurrency
1088 )
1089 if evaluators:
1090 # Apply evaluators to the predictions.
1091 manager = manager.with_evaluators(
1092 evaluators, max_concurrency=max_concurrency
1093 )
1094 if summary_evaluators:
1095 # Apply the experiment-level summary evaluators.
1096 manager = manager.with_summary_evaluators(summary_evaluators)
1097 # Start consuming the results.
1098 results = ExperimentResults(manager, blocking=blocking)
1099 return results
1102def _is_uuid(value: str) -> bool:
1103 try:
1104 uuid.UUID(value)
1105 return True
1106 except ValueError:
1107 return False
1110def _load_experiment(
1111 project: EXPERIMENT_T, client: langsmith.Client
1112) -> schemas.TracerSession:
1113 if isinstance(project, schemas.TracerSession):
1114 return project
1115 elif isinstance(project, uuid.UUID) or _is_uuid(project):
1116 return client.read_project(project_id=project)
1117 else:
1118 return client.read_project(project_name=project)
1121def _load_traces(
1122 project: Union[str, uuid.UUID, schemas.TracerSession],
1123 client: langsmith.Client,
1124 load_nested: bool = False,
1125) -> list[schemas.Run]:
1126 """Load nested traces for a given project."""
1127 is_root = None if load_nested else True
1128 if isinstance(project, schemas.TracerSession):
1129 runs = client.list_runs(project_id=project.id, is_root=is_root)
1130 elif isinstance(project, uuid.UUID) or _is_uuid(project):
1131 runs = client.list_runs(project_id=project, is_root=is_root)
1132 else:
1133 runs = client.list_runs(project_name=project, is_root=is_root)
1134 if not load_nested:
1135 return list(runs)
1137 treemap: collections.defaultdict[uuid.UUID, list[schemas.Run]] = (
1138 collections.defaultdict(list)
1139 )
1140 results = []
1141 all_runs = {}
1142 for run in runs:
1143 if run.parent_run_id is not None:
1144 treemap[run.parent_run_id].append(run)
1145 else:
1146 results.append(run)
1147 all_runs[run.id] = run
1148 for run_id, child_runs in treemap.items():
1149 all_runs[run_id].child_runs = sorted(child_runs, key=lambda r: r.dotted_order)
1150 return results
1153def _load_examples_map(
1154 client: langsmith.Client, project: schemas.TracerSession
1155) -> dict[uuid.UUID, schemas.Example]:
1156 return {
1157 e.id: e
1158 for e in client.list_examples(
1159 dataset_id=project.reference_dataset_id,
1160 as_of=project.metadata.get("dataset_version"),
1161 )
1162 }
1165IT = TypeVar("IT")
1168def _load_tqdm() -> Callable[[IT], IT]:
1169 try:
1170 from tqdm.auto import tqdm
1171 except ImportError:
1172 return lambda x: x
1173 return tqdm # type: ignore[return-value]
1176ET = TypeVar("ET", bound="_ExperimentManagerMixin")
1179class _ExperimentManagerMixin:
1180 def __init__(
1181 self,
1182 /,
1183 experiment: Optional[Union[schemas.TracerSession, str]],
1184 metadata: Optional[dict] = None,
1185 client: Optional[langsmith.Client] = None,
1186 description: Optional[str] = None,
1187 ):
1188 self.client = client or rt.get_cached_client()
1189 self._experiment: Optional[schemas.TracerSession] = None
1190 if experiment is None:
1191 self._experiment_name = _get_random_name()
1192 elif isinstance(experiment, str):
1193 self._experiment_name = experiment + "-" + str(uuid.uuid4().hex[:8])
1194 else:
1195 self._experiment_name = cast(str, experiment.name)
1196 self._experiment = experiment
1198 metadata = metadata or {}
1199 if not metadata.get("revision_id"):
1200 metadata = {
1201 "revision_id": ls_env.get_langchain_env_var_metadata().get(
1202 "revision_id"
1203 ),
1204 **metadata,
1205 }
1206 self._metadata = metadata or {}
1207 self._description = description
1209 @property
1210 def experiment_name(self) -> str:
1211 if self._experiment_name is not None:
1212 return self._experiment_name
1213 raise ValueError(
1214 "Experiment name not provided, and experiment not yet started."
1215 )
1217 def _get_experiment(self) -> schemas.TracerSession:
1218 if self._experiment is None:
1219 raise ValueError("Experiment not started yet.")
1220 return self._experiment
1222 def _get_experiment_metadata(self):
1223 project_metadata = self._metadata or {}
1224 project_metadata["__ls_runner"] = "py_sdk_evaluate"
1225 git_info = ls_env.get_git_info()
1226 if git_info:
1227 project_metadata = {
1228 **project_metadata,
1229 "git": git_info,
1230 }
1231 if self._experiment:
1232 project_metadata = {
1233 **self._experiment.metadata,
1234 **project_metadata,
1235 }
1236 return project_metadata
1238 def _create_experiment(
1239 self, dataset_id: uuid.UUID, metadata: dict
1240 ) -> schemas.TracerSession:
1241 # There is a chance of name collision, so we'll retry
1242 starting_name = self._experiment_name
1243 num_attempts = 10
1244 for _ in range(num_attempts):
1245 try:
1246 return self.client.create_project(
1247 self._experiment_name,
1248 description=self._description,
1249 reference_dataset_id=dataset_id,
1250 metadata=metadata,
1251 )
1252 except ls_utils.LangSmithConflictError:
1253 self._experiment_name = f"{starting_name}-{str(uuid.uuid4().hex[:6])}"
1254 raise ValueError(
1255 f"Could not find a unique experiment name in {num_attempts} attempts."
1256 " Please try again with a different experiment name."
1257 )
1259 def _get_project(self, first_example: schemas.Example) -> schemas.TracerSession:
1260 if self._experiment is None:
1261 project_metadata = self._get_experiment_metadata()
1262 project = self._create_experiment(
1263 first_example.dataset_id, project_metadata
1264 )
1265 else:
1266 project = self._experiment
1267 return project
1269 def _print_experiment_start(
1270 self, project: Optional[schemas.TracerSession], first_example: schemas.Example
1271 ) -> None:
1272 if project and project.url:
1273 # TODO: Make this a public API
1274 project_url = project.url.split("?")[0]
1275 dataset_id = first_example.dataset_id
1276 base_url = project_url.split("/projects/p/")[0]
1277 comparison_url = (
1278 f"{base_url}/datasets/{dataset_id}/compare?"
1279 f"selectedSessions={project.id}"
1280 )
1281 print( # noqa: T201
1282 f"View the evaluation results for experiment: '{self.experiment_name}'"
1283 f" at:\n{comparison_url}\n\n"
1284 )
1285 else:
1286 # HACKHACK
1287 print( # noqa: T201
1288 "Starting evaluation of experiment: %s", self.experiment_name
1289 )
1292class _ExperimentManager(_ExperimentManagerMixin):
1293 """Manage the execution of experiments.
1295 Supports lazily running predictions and evaluations in parallel to facilitate
1296 result streaming and early debugging.
1298 Args:
1299 data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR
1300 a generator of examples.
1301 num_repetitions (int): The number of times to run over the data.
1302 runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment
1303 predictions.
1304 experiment (Optional[schemas.TracerSession]): The tracer session
1305 associated with the experiment.
1306 experiment_prefix (Optional[str]): The prefix for the experiment name.
1307 metadata (Optional[dict]): Additional metadata for the experiment.
1308 client (Optional[langsmith.Client]): The Langsmith client used for
1309 the experiment.
1310 evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation
1311 sresults for the experiment.
1312 summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
1313 for the experiment.
1314 """
1316 def __init__(
1317 self,
1318 data: DATA_T,
1319 /,
1320 experiment: Optional[Union[schemas.TracerSession, str]],
1321 metadata: Optional[dict] = None,
1322 client: Optional[langsmith.Client] = None,
1323 runs: Optional[Iterable[schemas.Run]] = None,
1324 evaluation_results: Optional[Iterable[EvaluationResults]] = None,
1325 summary_results: Optional[Iterable[EvaluationResults]] = None,
1326 description: Optional[str] = None,
1327 num_repetitions: int = 1,
1328 include_attachments: bool = False,
1329 reuse_attachments: bool = False,
1330 upload_results: bool = True,
1331 attachment_raw_data_dict: Optional[dict] = None,
1332 error_handling: Literal["log", "ignore"] = "log",
1333 ):
1334 super().__init__(
1335 experiment=experiment,
1336 metadata=metadata,
1337 client=client,
1338 description=description,
1339 )
1340 self._data = data
1341 self._examples: Optional[Iterable[schemas.Example]] = None
1342 self._runs = runs
1343 self._evaluation_results = evaluation_results
1344 self._summary_results = summary_results
1345 self._num_repetitions = num_repetitions
1346 self._include_attachments = include_attachments
1347 self._reuse_attachments = reuse_attachments
1348 self._upload_results = upload_results
1349 self._attachment_raw_data_dict = attachment_raw_data_dict
1350 self._error_handling = error_handling
1352 def _reset_example_attachment_readers(
1353 self, example: schemas.Example
1354 ) -> schemas.Example:
1355 """Reset attachment readers for an example.
1357 This is only in the case that an attachment is going to be used by more
1358 than 1 callable (target + evaluators). In that case we keep a single copy
1359 of the attachment data in `self._attachment_raw_data_dict`, and create
1360 readers from that data. This makes it so that we don't have to keep
1361 copies of the same data in memory, instead we can just create readers
1362 from the same data.
1363 """
1364 if not hasattr(example, "attachments") or not example.attachments:
1365 return example
1367 new_attachments: dict[str, schemas.AttachmentInfo] = {}
1368 for name, attachment in example.attachments.items():
1369 if (
1370 self._attachment_raw_data_dict is not None
1371 and str(example.id) + name in self._attachment_raw_data_dict
1372 ):
1373 new_attachments[name] = {
1374 "presigned_url": attachment["presigned_url"],
1375 "reader": io.BytesIO(
1376 self._attachment_raw_data_dict[str(example.id) + name]
1377 ),
1378 "mime_type": attachment["mime_type"],
1379 }
1380 else:
1381 new_attachments[name] = attachment
1383 # Create a new Example instance with the updated attachments
1384 return schemas.Example(
1385 id=example.id,
1386 created_at=example.created_at,
1387 dataset_id=example.dataset_id,
1388 inputs=example.inputs,
1389 outputs=example.outputs,
1390 metadata=example.metadata,
1391 modified_at=example.modified_at,
1392 source_run_id=example.source_run_id,
1393 attachments=new_attachments,
1394 _host_url=example._host_url,
1395 _tenant_id=example._tenant_id,
1396 )
1398 @property
1399 def examples(self) -> Iterable[schemas.Example]:
1400 if self._examples is None:
1401 self._examples = _resolve_data(
1402 self._data,
1403 client=self.client,
1404 include_attachments=self._include_attachments,
1405 )
1406 if self._reuse_attachments and self._attachment_raw_data_dict is None:
1407 examples_copy, self._examples = itertools.tee(self._examples)
1408 self._attachment_raw_data_dict = {
1409 str(e.id) + name: value["reader"].read()
1410 for e in examples_copy
1411 for name, value in (e.attachments or {}).items()
1412 }
1413 if self._num_repetitions > 1:
1414 examples_list = list(self._examples)
1415 self._examples = itertools.chain.from_iterable(
1416 [
1417 self._reset_example_attachment_readers(example)
1418 for example in examples_list
1419 ]
1420 for _ in range(self._num_repetitions)
1421 )
1422 self._examples, examples_iter = itertools.tee(self._examples)
1423 return examples_iter
1425 @property
1426 def dataset_id(self) -> str:
1427 if self._experiment is None or not getattr(
1428 self._experiment, "reference_dataset_id", None
1429 ):
1430 example = next(iter(self.examples))
1431 return str(example.dataset_id)
1432 return str(
1433 cast(schemas.TracerSessionResult, self._experiment).reference_dataset_id
1434 )
1436 @property
1437 def evaluation_results(self) -> Iterable[EvaluationResults]:
1438 if self._evaluation_results is None:
1439 return ({"results": []} for _ in self.examples)
1440 return self._evaluation_results
1442 @property
1443 def runs(self) -> Iterable[schemas.Run]:
1444 if self._runs is None:
1445 raise ValueError(
1446 "Runs not provided in this experiment. Please predict first."
1447 )
1448 self._runs, runs_iter = itertools.tee(self._runs)
1449 return runs_iter
1451 def start(self) -> _ExperimentManager:
1452 first_example = next(itertools.islice(self.examples, 1))
1453 project = self._get_project(first_example) if self._upload_results else None
1454 self._print_experiment_start(project, first_example)
1455 self._metadata["num_repetitions"] = self._num_repetitions
1456 return self._copy(self.examples, experiment=project)
1458 def with_predictions(
1459 self,
1460 target: TARGET_T,
1461 /,
1462 max_concurrency: Optional[int] = None,
1463 ) -> _ExperimentManager:
1464 """Lazily apply the target function to the experiment."""
1465 context = copy_context()
1466 _experiment_results = context.run(
1467 self._predict,
1468 target,
1469 max_concurrency=max_concurrency,
1470 include_attachments=_target_include_attachments(target),
1471 )
1472 r1, r2 = itertools.tee(_experiment_results, 2)
1473 return self._copy(
1474 (pred["example"] for pred in r1), runs=(pred["run"] for pred in r2)
1475 )
1477 def with_evaluators(
1478 self,
1479 evaluators: Sequence[
1480 Union[
1481 EVALUATOR_T,
1482 RunEvaluator,
1483 ]
1484 ],
1485 *,
1486 max_concurrency: Optional[int] = None,
1487 ) -> _ExperimentManager:
1488 """Lazily apply the provided evaluators to the experiment."""
1489 evaluators = _resolve_evaluators(evaluators)
1490 context = copy_context()
1491 experiment_results = context.run(
1492 self._score, evaluators, max_concurrency=max_concurrency
1493 )
1494 # Split the generator into three so the manager
1495 # can consume each value individually.
1496 r1, r2, r3 = itertools.tee(experiment_results, 3)
1497 return self._copy(
1498 (result["example"] for result in r1),
1499 runs=(result["run"] for result in r2),
1500 evaluation_results=(result["evaluation_results"] for result in r3),
1501 )
1503 def with_summary_evaluators(
1504 self,
1505 summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
1506 ) -> _ExperimentManager:
1507 """Lazily apply the provided summary evaluators to the experiment."""
1508 wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators)
1509 context = copy_context()
1510 aggregate_feedback_gen = context.run(
1511 self._apply_summary_evaluators, wrapped_evaluators
1512 )
1513 return self._copy(
1514 self.examples, runs=self.runs, summary_results=aggregate_feedback_gen
1515 )
1517 def get_results(self) -> Iterable[ExperimentResultRow]:
1518 """Return the traces, evaluation results, and associated examples."""
1519 for run, example, evaluation_results in zip(
1520 self.runs, self.examples, self.evaluation_results
1521 ):
1522 yield ExperimentResultRow(
1523 run=run,
1524 example=example,
1525 evaluation_results=evaluation_results,
1526 )
1528 def get_summary_scores(self) -> dict[str, list[dict]]:
1529 """If `summary_evaluators` were applied, consume and return the results."""
1530 if self._summary_results is None:
1531 return {"results": []}
1532 # Consume the generator
1533 return {
1534 "results": [
1535 res # type: ignore[misc]
1536 for results in self._summary_results
1537 for res in results["results"]
1538 ]
1539 }
1541 # Private methods
1543 def _predict(
1544 self,
1545 target: TARGET_T,
1546 /,
1547 max_concurrency: Optional[int] = None,
1548 include_attachments: bool = False,
1549 ) -> Generator[_ForwardResults, None, None]:
1550 """Run the target function on the examples."""
1551 fn = _ensure_traceable(target)
1553 if max_concurrency == 0:
1554 for example in self.examples:
1555 yield _forward(
1556 fn,
1557 example,
1558 self.experiment_name,
1559 self._metadata,
1560 self.client,
1561 self._upload_results,
1562 include_attachments,
1563 self._error_handling,
1564 )
1566 else:
1567 with ls_utils.ContextThreadPoolExecutor(max_concurrency) as executor:
1568 futures = [
1569 executor.submit(
1570 _forward,
1571 fn,
1572 example,
1573 self.experiment_name,
1574 self._metadata,
1575 self.client,
1576 self._upload_results,
1577 include_attachments,
1578 self._error_handling,
1579 )
1580 for example in self.examples
1581 ]
1582 for future in cf.as_completed(futures):
1583 yield future.result()
1584 # Close out the project.
1585 self._end()
1587 def _run_evaluators(
1588 self,
1589 evaluators: Sequence[RunEvaluator],
1590 current_results: ExperimentResultRow,
1591 executor: cf.ThreadPoolExecutor,
1592 ) -> ExperimentResultRow:
1593 current_context = rh.get_tracing_context()
1594 metadata = {
1595 **(current_context["metadata"] or {}),
1596 **{
1597 "experiment": self.experiment_name,
1598 "reference_example_id": current_results["example"].id,
1599 "reference_run_id": current_results["run"].id,
1600 },
1601 }
1602 with rh.tracing_context(
1603 **{
1604 **current_context,
1605 "project_name": "evaluators",
1606 "metadata": metadata,
1607 "enabled": "local" if not self._upload_results else True,
1608 "client": self.client,
1609 }
1610 ):
1611 run = current_results["run"]
1612 example = current_results["example"]
1613 eval_results = current_results["evaluation_results"]
1614 for evaluator in evaluators:
1615 evaluator_run_id = uuid.uuid4()
1616 try:
1617 evaluator_response = evaluator.evaluate_run( # type: ignore[call-arg]
1618 run=run,
1619 example=example,
1620 evaluator_run_id=evaluator_run_id,
1621 )
1623 eval_results["results"].extend(
1624 self.client._select_eval_results(evaluator_response)
1625 )
1626 if self._upload_results:
1627 # TODO: This is a hack
1628 self.client._log_evaluation_feedback(
1629 evaluator_response, run=run, _executor=executor
1630 )
1631 except Exception as e:
1632 try:
1633 feedback_keys = _extract_feedback_keys(evaluator)
1635 error_response = EvaluationResults(
1636 results=[
1637 EvaluationResult(
1638 key=key,
1639 source_run_id=evaluator_run_id,
1640 comment=repr(e),
1641 extra={"error": True},
1642 )
1643 for key in feedback_keys
1644 ]
1645 )
1646 eval_results["results"].extend(
1647 self.client._select_eval_results(error_response)
1648 )
1649 if self._upload_results:
1650 # TODO: This is a hack
1651 self.client._log_evaluation_feedback(
1652 error_response, run=run, _executor=executor
1653 )
1654 except Exception as e2:
1655 logger.debug(f"Error parsing feedback keys: {e2}")
1656 pass
1657 logger.error(
1658 f"Error running evaluator {repr(evaluator)} on"
1659 f" run {run.id if run else ''}: {repr(e)}",
1660 exc_info=True,
1661 )
1662 if example.attachments is not None:
1663 for attachment in example.attachments:
1664 reader = example.attachments[attachment]["reader"]
1665 reader.seek(0)
1667 return ExperimentResultRow(
1668 run=run,
1669 example=example,
1670 evaluation_results=eval_results,
1671 )
1673 def _score(
1674 self,
1675 evaluators: Sequence[RunEvaluator],
1676 max_concurrency: Optional[int] = None,
1677 ) -> Iterable[ExperimentResultRow]:
1678 """Run the evaluators on the prediction stream.
1680 Expects runs to be available in the manager.
1681 (e.g. from a previous prediction step)
1682 """
1683 with ls_utils.ContextThreadPoolExecutor(
1684 max_workers=max_concurrency or 1
1685 ) as executor:
1686 if max_concurrency == 0:
1687 context = copy_context()
1688 for current_results in self.get_results():
1689 yield context.run(
1690 self._run_evaluators,
1691 evaluators,
1692 current_results,
1693 executor,
1694 )
1695 else:
1696 futures = set()
1697 for current_results in self.get_results():
1698 futures.add(
1699 executor.submit(
1700 self._run_evaluators,
1701 evaluators,
1702 current_results,
1703 executor,
1704 )
1705 )
1706 try:
1707 # Since prediction may be slow, yield (with a timeout) to
1708 # allow for early results to be emitted.
1709 for future in cf.as_completed(futures, timeout=0.001):
1710 yield future.result()
1711 futures.remove(future)
1712 except (cf.TimeoutError, TimeoutError):
1713 pass
1714 for future in cf.as_completed(futures):
1715 result = future.result()
1716 yield result
1718 def _apply_summary_evaluators(
1719 self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
1720 ) -> Generator[EvaluationResults, None, None]:
1721 runs, examples = [], []
1722 for run, example in zip(self.runs, self.examples):
1723 runs.append(run)
1724 examples.append(example)
1725 aggregate_feedback = []
1726 with ls_utils.ContextThreadPoolExecutor() as executor:
1727 project_id = self._get_experiment().id if self._upload_results else None
1728 current_context = rh.get_tracing_context()
1729 metadata = {
1730 **(current_context["metadata"] or {}),
1731 **{
1732 "experiment": self.experiment_name,
1733 "experiment_id": project_id,
1734 },
1735 }
1736 with rh.tracing_context(
1737 **{
1738 **current_context,
1739 "project_name": "evaluators",
1740 "metadata": metadata,
1741 "client": self.client,
1742 "enabled": "local" if not self._upload_results else True,
1743 }
1744 ):
1745 for evaluator in summary_evaluators:
1746 try:
1747 summary_eval_result = evaluator(runs, examples)
1748 # TODO: Expose public API for this.
1749 flattened_results = self.client._select_eval_results(
1750 summary_eval_result,
1751 fn_name=evaluator.__name__,
1752 )
1753 aggregate_feedback.extend(flattened_results)
1754 if self._upload_results:
1755 for result in flattened_results:
1756 feedback = result.dict(exclude={"target_run_id"})
1757 evaluator_info = feedback.pop("evaluator_info", None)
1758 executor.submit(
1759 self.client.create_feedback,
1760 **feedback,
1761 run_id=None,
1762 project_id=project_id,
1763 source_info=evaluator_info,
1764 )
1765 except Exception as e:
1766 logger.error(
1767 f"Error running summary evaluator {repr(evaluator)}: {e}",
1768 exc_info=True,
1769 )
1770 yield {"results": aggregate_feedback}
1772 def _get_dataset_version(self) -> Optional[str]:
1773 examples = list(self.examples)
1774 modified_at = [ex.modified_at for ex in examples if ex.modified_at]
1775 # Should always be defined in practice when fetched,
1776 # but the typing permits None
1777 max_modified_at = max(modified_at) if modified_at else None
1778 return max_modified_at.isoformat() if max_modified_at else None
1780 def _get_dataset_splits(self) -> Optional[list[str]]:
1781 examples = list(self.examples)
1782 splits = set()
1783 for example in examples:
1784 if (
1785 example.metadata
1786 and example.metadata.get("dataset_split")
1787 and isinstance(example.metadata["dataset_split"], list)
1788 ):
1789 for split in example.metadata["dataset_split"]:
1790 if isinstance(split, str):
1791 splits.add(split)
1792 else:
1793 splits.add("base")
1795 return list(splits)
1797 def _end(self) -> None:
1798 if not self._upload_results:
1799 return
1800 experiment = self._experiment
1801 if experiment is None:
1802 raise ValueError("Experiment not started yet.")
1804 project_metadata = self._get_experiment_metadata()
1805 project_metadata["dataset_version"] = self._get_dataset_version()
1806 project_metadata["dataset_splits"] = self._get_dataset_splits()
1807 self.client.update_project(
1808 experiment.id,
1809 metadata={
1810 **experiment.metadata,
1811 **project_metadata,
1812 },
1813 )
1815 def _copy(self, *args: Any, **kwargs: Any) -> _ExperimentManager:
1816 default_args = (self._data,)
1817 default_kwargs = {
1818 "experiment": self._experiment,
1819 "metadata": self._metadata,
1820 "runs": self._runs,
1821 "client": self.client,
1822 "evaluation_results": self._evaluation_results,
1823 "summary_results": self._summary_results,
1824 "include_attachments": self._include_attachments,
1825 "reuse_attachments": self._reuse_attachments,
1826 "upload_results": self._upload_results,
1827 "attachment_raw_data_dict": self._attachment_raw_data_dict,
1828 "error_handling": self._error_handling,
1829 }
1830 full_args = list(args) + list(default_args[len(args) :])
1831 full_kwargs = {**default_kwargs, **kwargs}
1832 return self.__class__(*full_args, **full_kwargs)
1835def _resolve_evaluators(
1836 evaluators: Sequence[Union[EVALUATOR_T, RunEvaluator, AEVALUATOR_T]],
1837) -> Sequence[RunEvaluator]:
1838 results = []
1839 for evaluator in evaluators:
1840 if isinstance(evaluator, RunEvaluator):
1841 results.append(evaluator)
1842 elif isinstance(evaluator, LangChainStringEvaluator):
1843 results.append(evaluator.as_run_evaluator())
1844 else:
1845 results.append(run_evaluator(evaluator))
1846 return results
1849def _wrap_summary_evaluators(
1850 evaluators: Sequence[SUMMARY_EVALUATOR_T],
1851) -> list[SUMMARY_EVALUATOR_T]:
1852 def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T:
1853 eval_name = getattr(evaluator, "__name__", "BatchEvaluator")
1854 evaluator = _normalize_summary_evaluator(evaluator)
1856 @functools.wraps(evaluator)
1857 def _wrapper_inner(
1858 runs: Sequence[schemas.Run], examples: Sequence[schemas.Example]
1859 ) -> Union[EvaluationResult, EvaluationResults]:
1860 @rh.traceable(name=eval_name)
1861 def _wrapper_super_inner(
1862 runs_: str, examples_: str
1863 ) -> Union[EvaluationResult, EvaluationResults]:
1864 return evaluator(list(runs), list(examples))
1866 return _wrapper_super_inner(
1867 f"Runs[] (Length={len(runs)})", f"Examples[] (Length={len(examples)})"
1868 )
1870 return _wrapper_inner
1872 results = []
1873 for evaluator in evaluators:
1874 results.append(_wrap(evaluator))
1875 return results
1878class _ForwardResults(TypedDict):
1879 run: schemas.Run
1880 example: schemas.Example
1883def _forward(
1884 fn: rh.SupportsLangsmithExtra,
1885 example: schemas.Example,
1886 experiment_name: str,
1887 metadata: dict,
1888 client: langsmith.Client,
1889 upload_results: bool,
1890 include_attachments: bool = False,
1891 error_handling: Literal["log", "ignore"] = "log",
1892) -> _ForwardResults:
1893 run: Optional[schemas.RunBase] = None
1895 def _get_run(r: rt.RunTree) -> None:
1896 nonlocal run
1897 run = r
1899 def _set_reference_example_id(r: rt.RunTree) -> None:
1900 r.reference_example_id = example.id
1902 example_version = (example.modified_at or example.created_at).isoformat()
1903 langsmith_extra = rh.LangSmithExtra(
1904 on_end=_get_run,
1905 project_name=experiment_name,
1906 metadata={**metadata, "example_version": example_version},
1907 client=client,
1908 )
1909 if error_handling == "log":
1910 langsmith_extra["reference_example_id"] = example.id
1911 elif error_handling == "ignore":
1912 # Only set the reference_example_id if the run succeeds.
1913 langsmith_extra["_on_success"] = _set_reference_example_id
1914 else:
1915 raise ValueError(f"Unrecognized error_handling value: {error_handling=}")
1917 with rh.tracing_context(enabled="local" if not upload_results else True):
1918 try:
1919 arg_names = _get_target_args(fn)
1920 args = [getattr(example, argn) for argn in arg_names]
1921 fn(*args, langsmith_extra=langsmith_extra)
1922 # Reset attachment readers if attachments were used.
1923 if include_attachments and example.attachments is not None:
1924 for attachment in example.attachments:
1925 reader = example.attachments[attachment]["reader"]
1926 reader.seek(0)
1927 except Exception as e:
1928 logger.error(
1929 f"Error running target function: {e}", exc_info=True, stacklevel=1
1930 )
1931 return _ForwardResults(run=cast(schemas.Run, run), example=example)
1934def _is_valid_uuid(value: str) -> bool:
1935 try:
1936 uuid.UUID(value)
1937 return True
1938 except ValueError:
1939 return False
1942def _resolve_data(
1943 data: DATA_T,
1944 *,
1945 client: langsmith.Client,
1946 include_attachments: bool = False,
1947) -> Iterable[schemas.Example]:
1948 """Return the examples for the given dataset."""
1949 if isinstance(data, uuid.UUID):
1950 return client.list_examples(
1951 dataset_id=data, include_attachments=include_attachments
1952 )
1953 elif isinstance(data, str) and _is_valid_uuid(data):
1954 return client.list_examples(
1955 dataset_id=uuid.UUID(data), include_attachments=include_attachments
1956 )
1957 elif isinstance(data, str):
1958 return client.list_examples(
1959 dataset_name=data, include_attachments=include_attachments
1960 )
1961 elif isinstance(data, schemas.Dataset):
1962 return client.list_examples(
1963 dataset_id=data.id, include_attachments=include_attachments
1964 )
1965 return data
1968def _ensure_traceable(
1969 target: TARGET_T | rh.SupportsLangsmithExtra[[dict], dict] | Runnable,
1970) -> rh.SupportsLangsmithExtra[[dict], dict]:
1971 """Ensure the target function is traceable."""
1972 if not _is_callable(target):
1973 raise ValueError(
1974 "Target must be a callable function or a langchain/langgraph object. For "
1975 "example:\n\n"
1976 "def predict(inputs: dict) -> dict:\n"
1977 " # do work, like chain.invoke(inputs)\n"
1978 " return {...}\n\n"
1979 "evaluate(\n"
1980 " predict,\n"
1981 " ...\n"
1982 ")"
1983 )
1985 if rh.is_traceable_function(target):
1986 fn: rh.SupportsLangsmithExtra[[dict], dict] = target
1987 else:
1988 if _is_langchain_runnable(target):
1989 target = target.invoke # type: ignore[union-attr]
1990 fn = rh.traceable(name="Target")(cast(Callable, target))
1991 return fn
1994def _include_attachments(target: Any, evaluators: Optional[Sequence]) -> bool:
1995 return _target_include_attachments(target) or bool(
1996 _evaluators_include_attachments(evaluators)
1997 )
2000def _evaluators_include_attachments(evaluators: Optional[Sequence]) -> int:
2001 if evaluators is None:
2002 return 0
2004 return sum(_evaluator_uses_attachments(e) for e in evaluators)
2007def _evaluator_uses_attachments(evaluator: Any) -> bool:
2008 if not callable(evaluator):
2009 return False
2010 sig = inspect.signature(evaluator)
2011 params = list(sig.parameters.values())
2012 positional_params = [
2013 p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
2014 ]
2015 return any(p.name == "attachments" for p in positional_params)
2018def _target_include_attachments(target: Any) -> bool:
2019 """Whether the target function accepts attachments."""
2020 return "attachments" in _get_target_args(target)
2023def _get_target_args(target: Any) -> list[str]:
2024 """Whether the target function accepts attachments."""
2025 if not callable(target):
2026 return []
2027 if _is_langchain_runnable(target):
2028 return ["inputs"]
2029 # Check function signature
2030 sig = inspect.signature(target)
2031 params = list(sig.parameters.values())
2032 positional_params = [
2033 p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
2034 ]
2035 positional_no_default = [p for p in positional_params if p.default is p.empty]
2037 if len(positional_params) == 0:
2038 raise ValueError(
2039 "Target function must accept at least one positional argument (inputs)."
2040 )
2041 elif len(positional_no_default) > 3:
2042 raise ValueError(
2043 "Target function must accept at most three "
2044 "arguments without default values: (inputs, attachments, metadata)."
2045 )
2046 elif len(positional_no_default) > 1 and {
2047 p.name for p in positional_no_default
2048 }.difference(["inputs", "attachments", "metadata"]):
2049 raise ValueError(
2050 "When passing multiple positional arguments without default values, they "
2051 "must be named 'inputs', 'attachments', or 'metadata'. Received: "
2052 f"{[p.name for p in positional_no_default]}"
2053 )
2054 else:
2055 args = []
2056 for p in positional_params[:3]:
2057 if p.name in {"inputs", "attachments", "metadata"}:
2058 args.append(p.name)
2059 else:
2060 break
2061 return args or ["inputs"]
2064def _resolve_experiment(
2065 experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]],
2066 runs: Optional[Iterable[schemas.Run]],
2067 client: langsmith.Client,
2068) -> tuple[
2069 Optional[Union[schemas.TracerSession, str]], Optional[Iterable[schemas.Run]]
2070]:
2071 # TODO: Remove this, handle outside the manager
2072 if experiment is not None:
2073 if isinstance(experiment, schemas.TracerSession):
2074 experiment_ = experiment
2075 else:
2076 experiment_ = _load_experiment(experiment, client)
2078 if not experiment_.name:
2079 raise ValueError("Experiment name must be defined if provided.")
2080 if not experiment_.reference_dataset_id:
2081 raise ValueError(
2082 "Experiment must have an associated reference_dataset_id, "
2083 "but none was provided."
2084 )
2085 return experiment_, runs
2086 # If we have runs, that means the experiment was already started.
2087 if runs is not None:
2088 runs_, runs = itertools.tee(runs)
2089 first_run = next(runs_)
2090 experiment_ = client.read_project(project_id=first_run.session_id)
2091 if not experiment_.name:
2092 raise ValueError("Experiment name not found for provided runs.")
2093 return experiment_, runs
2094 return None, None
2097def _get_random_name() -> str:
2098 from langsmith.evaluation._name_generation import random_name # noqa: F401
2100 return random_name()
2103def _extract_feedback_keys(evaluator: RunEvaluator):
2104 if isinstance(evaluator, DynamicRunEvaluator):
2105 if getattr(evaluator, "func", None):
2106 return _extract_code_evaluator_feedback_keys(evaluator.func)
2107 elif getattr(evaluator, "afunc", None):
2108 return _extract_code_evaluator_feedback_keys(evaluator.afunc)
2109 # TODO: Support for DynamicComparisonRunEvaluator
2110 if hasattr(evaluator, "evaluator"):
2111 # LangChainStringEvaluator
2112 if getattr(getattr(evaluator, "evaluator"), "evaluation_name", None):
2113 return [evaluator.evaluator.evaluation_name]
2114 return []
2117def _extract_code_evaluator_feedback_keys(func: Callable) -> list[str]:
2118 python_code = inspect.getsource(func)
2120 def extract_dict_keys(node):
2121 if isinstance(node, ast.Dict):
2122 keys = []
2123 key_value = None
2124 for key, value in zip(node.keys, node.values):
2125 if isinstance(key, (ast.Str, ast.Constant)):
2126 key_str = key.s if isinstance(key, ast.Str) else key.value
2127 if key_str == "key" and isinstance(value, (ast.Str, ast.Constant)):
2128 key_value = (
2129 value.s if isinstance(value, ast.Str) else value.value
2130 )
2131 return [key_value] if key_value else keys
2132 elif (
2133 isinstance(node, ast.Call)
2134 and isinstance(node.func, ast.Name)
2135 and node.func.id == "dict"
2136 ):
2137 for keyword in node.keywords:
2138 if keyword.arg == "key" and isinstance(
2139 keyword.value, (ast.Str, ast.Constant)
2140 ):
2141 return [
2142 (
2143 keyword.value.s
2144 if isinstance(keyword.value, ast.Str)
2145 else keyword.value.value
2146 )
2147 ]
2148 return []
2150 def extract_evaluation_result_key(node):
2151 if (
2152 isinstance(node, ast.Call)
2153 and isinstance(node.func, ast.Name)
2154 and node.func.id == "EvaluationResult"
2155 ):
2156 for keyword in node.keywords:
2157 if keyword.arg == "key" and isinstance(
2158 keyword.value, (ast.Str, ast.Constant)
2159 ):
2160 return [
2161 (
2162 keyword.value.s
2163 if isinstance(keyword.value, ast.Str)
2164 else keyword.value.value
2165 )
2166 ]
2167 return []
2169 def extract_evaluation_results_keys(node, variables):
2170 if (
2171 isinstance(node, ast.Call)
2172 and isinstance(node.func, ast.Name)
2173 and node.func.id == "EvaluationResults"
2174 ):
2175 for keyword in node.keywords:
2176 if keyword.arg == "results":
2177 if isinstance(keyword.value, ast.Name):
2178 return variables.get(keyword.value.id, [])
2179 elif isinstance(keyword.value, ast.List):
2180 keys = []
2181 for elt in keyword.value.elts:
2182 keys.extend(extract_evaluation_result_key(elt))
2183 return keys
2184 elif isinstance(node, ast.Dict):
2185 for key, value in zip(node.keys, node.values):
2186 if isinstance(key, (ast.Str, ast.Constant)) and key.s == "results":
2187 if isinstance(value, ast.List):
2188 keys = []
2189 for elt in value.elts:
2190 if isinstance(elt, ast.Dict):
2191 for elt_key, elt_value in zip(elt.keys, elt.values):
2192 if (
2193 isinstance(elt_key, (ast.Str, ast.Constant))
2194 and elt_key.s == "key"
2195 ):
2196 if isinstance(
2197 elt_value, (ast.Str, ast.Constant)
2198 ):
2199 keys.append(elt_value.s)
2200 elif (
2201 isinstance(elt, ast.Call)
2202 and isinstance(elt.func, ast.Name)
2203 and elt.func.id in ("EvaluationResult", "dict")
2204 ):
2205 for keyword in elt.keywords:
2206 if keyword.arg == "key" and isinstance(
2207 keyword.value, (ast.Str, ast.Constant)
2208 ):
2209 keys.append(
2210 keyword.value.s
2211 if isinstance(keyword.value, ast.Str)
2212 else keyword.value.value
2213 )
2215 return keys
2216 return []
2218 python_code = textwrap.dedent(python_code)
2220 try:
2221 tree = ast.parse(python_code)
2222 function_def = tree.body[0]
2223 if not isinstance(function_def, (ast.FunctionDef, ast.AsyncFunctionDef)):
2224 return []
2226 variables = {}
2227 keys = []
2229 for node in ast.walk(function_def):
2230 if isinstance(node, ast.Assign):
2231 if isinstance(node.value, ast.List):
2232 list_keys = []
2233 for elt in node.value.elts:
2234 list_keys.extend(extract_evaluation_result_key(elt))
2235 if isinstance(node.targets[0], ast.Name):
2236 variables[node.targets[0].id] = list_keys
2237 elif isinstance(node, ast.Return) and node.value is not None:
2238 dict_keys = extract_dict_keys(node.value)
2239 eval_result_key = extract_evaluation_result_key(node.value)
2240 eval_results_keys = extract_evaluation_results_keys(
2241 node.value, variables
2242 )
2244 keys.extend(dict_keys)
2245 keys.extend(eval_result_key)
2246 keys.extend(eval_results_keys)
2248 # If no keys found, return the function name
2249 return keys if keys else [function_def.name]
2251 except SyntaxError:
2252 return []
2255def _to_pandas(
2256 results: list[ExperimentResultRow],
2257 start: Optional[int] = 0,
2258 end: Optional[int] = None,
2259):
2260 try:
2261 import pandas as pd
2262 except ImportError as e:
2263 raise ImportError(
2264 "The 'pandas' library is required to use the 'to_pandas' function. "
2265 "Please install it using 'pip install pandas' or "
2266 "'conda install pandas' before calling this method."
2267 ) from e
2269 return pd.DataFrame(_flatten_experiment_results(results, start=start, end=end))
2272def _flatten_experiment_results(
2273 results: list[ExperimentResultRow],
2274 start: Optional[int] = 0,
2275 end: Optional[int] = None,
2276):
2277 return [
2278 {
2279 **{f"inputs.{k}": v for k, v in (x["example"].inputs or {}).items()},
2280 **{f"outputs.{k}": v for k, v in (x["run"].outputs or {}).items()},
2281 "error": x["run"].error,
2282 **(
2283 {f"reference.{k}": v for k, v in x["example"].outputs.items()}
2284 if x["example"].outputs is not None
2285 else {}
2286 ),
2287 **{
2288 f"feedback.{r.key}": r.score if r.score is not None else r.value
2289 for r in x["evaluation_results"]["results"]
2290 },
2291 "execution_time": (
2292 (x["run"].end_time - x["run"].start_time).total_seconds()
2293 if x["run"].end_time
2294 else None
2295 ),
2296 "example_id": x["run"].reference_example_id,
2297 "id": x["run"].id,
2298 }
2299 for x in results[start:end]
2300 ]
2303@functools.lru_cache(maxsize=1)
2304def _import_langchain_runnable() -> Optional[type]:
2305 try:
2306 from langchain_core.runnables import Runnable
2308 return Runnable
2309 except ImportError:
2310 return None
2313def _is_langchain_runnable(o: Any) -> bool:
2314 return bool((Runnable := _import_langchain_runnable()) and isinstance(o, Runnable))