Coverage for langsmith/evaluation/_arunner.py: 17%
384 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""V2 Evaluation Interface."""
3from __future__ import annotations
5import asyncio
6import concurrent.futures as cf
7import io
8import logging
9import pathlib
10import uuid
11from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Iterable, Sequence
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Callable,
16 Literal,
17 Optional,
18 TypeVar,
19 Union,
20 cast,
21)
23import langsmith
24from langsmith import run_helpers as rh
25from langsmith import run_trees, schemas
26from langsmith import run_trees as rt
27from langsmith import utils as ls_utils
28from langsmith._internal import _aiter as aitertools
29from langsmith._internal._beta_decorator import _warn_once
30from langsmith.evaluation._runner import (
31 AEVALUATOR_T,
32 DATA_T,
33 EVALUATOR_T,
34 ExperimentResultRow,
35 _evaluators_include_attachments,
36 _ExperimentManagerMixin,
37 _extract_feedback_keys,
38 _ForwardResults,
39 _get_target_args,
40 _is_langchain_runnable,
41 _load_examples_map,
42 _load_experiment,
43 _load_tqdm,
44 _load_traces,
45 _resolve_data,
46 _resolve_evaluators,
47 _resolve_experiment,
48 _target_include_attachments,
49 _to_pandas,
50 _wrap_summary_evaluators,
51)
52from langsmith.evaluation.evaluator import (
53 SUMMARY_EVALUATOR_T,
54 EvaluationResult,
55 EvaluationResults,
56 RunEvaluator,
57)
59if TYPE_CHECKING:
60 import pandas as pd
61 from langchain_core.runnables import Runnable
63 DataFrame = pd.DataFrame
64else:
65 DataFrame = Any
67logger = logging.getLogger(__name__)
69ATARGET_T = Union[
70 Callable[[dict], Awaitable[dict]], Callable[[dict, dict], Awaitable[dict]]
71]
74async def aevaluate(
75 target: Union[
76 ATARGET_T, AsyncIterable[dict], Runnable, str, uuid.UUID, schemas.TracerSession
77 ],
78 /,
79 data: Union[
80 DATA_T, AsyncIterable[schemas.Example], Iterable[schemas.Example], None
81 ] = None,
82 evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None,
83 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
84 metadata: Optional[dict] = None,
85 experiment_prefix: Optional[str] = None,
86 description: Optional[str] = None,
87 max_concurrency: Optional[int] = 0,
88 num_repetitions: int = 1,
89 client: Optional[langsmith.Client] = None,
90 blocking: bool = True,
91 experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
92 upload_results: bool = True,
93 error_handling: Literal["log", "ignore"] = "log",
94 **kwargs: Any,
95) -> AsyncExperimentResults:
96 r"""Evaluate an async target system on a given dataset.
98 Args:
99 target (AsyncCallable[[dict], dict] | AsyncIterable[dict] | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]):
100 The target system or experiment(s) to evaluate.
102 Can be an async function that takes a `dict` and returns a `dict`, a
103 langchain `Runnable`, an existing experiment ID, or a two-tuple of experiment IDs.
104 data (Union[DATA_T, AsyncIterable[schemas.Example]]): The dataset to evaluate on.
106 Can be a dataset name, a list of examples, an async generator of examples, or an async iterable of examples.
107 evaluators (Optional[Sequence[EVALUATOR_T]]): A list of evaluators to run
108 on each example.
109 summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): A list of summary
110 evaluators to run on the entire dataset.
111 metadata (Optional[dict]): Metadata to attach to the experiment.
112 experiment_prefix (Optional[str]): A prefix to provide for your experiment name.
113 description (Optional[str]): A description of the experiment.
114 max_concurrency (int | None): The maximum number of concurrent
115 evaluations to run.
117 If `None` then no limit is set. If `0` then no concurrency.
118 num_repetitions (int): The number of times to run the evaluation.
119 Each item in the dataset will be run and evaluated this many times.
120 client (Optional[langsmith.Client]): The LangSmith client to use.
121 blocking (bool): Whether to block until the evaluation is complete.
122 experiment (Optional[schemas.TracerSession]): An existing experiment to
123 extend.
125 If provided, `experiment_prefix` is ignored. For advanced usage only.
126 error_handling (str, default="log"): How to handle individual run errors.
128 `'log'` will trace the runs with the error message as part of the
129 experiment, `'ignore'` will not count the run as part of the experiment at
130 all.
132 Returns:
133 An async iterator over the experiment results.
135 Environment:
136 - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to save time and
137 cost during testing.
139 Recommended to commit the cache files to your repository for faster CI/CD runs.
141 Requires the `'langsmith[vcr]'` package to be installed.
143 Examples:
144 >>> from typing import Sequence
145 >>> from langsmith import Client, aevaluate
146 >>> from langsmith.schemas import Example, Run
147 >>> client = Client()
148 >>> dataset = client.clone_public_dataset(
149 ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
150 ... )
151 >>> dataset_name = "Evaluate Examples"
153 Basic usage:
155 >>> def accuracy(run: Run, example: Example):
156 ... # Row-level evaluator for accuracy.
157 ... pred = run.outputs["output"]
158 ... expected = example.outputs["answer"]
159 ... return {"score": expected.lower() == pred.lower()}
161 >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
162 ... # Experiment-level evaluator for precision.
163 ... # TP / (TP + FP)
164 ... predictions = [run.outputs["output"].lower() for run in runs]
165 ... expected = [example.outputs["answer"].lower() for example in examples]
166 ... # yes and no are the only possible answers
167 ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
168 ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
169 ... return {"score": tp / (tp + fp)}
171 >>> import asyncio
172 >>> async def apredict(inputs: dict) -> dict:
173 ... # This can be any async function or just an API call to your app.
174 ... await asyncio.sleep(0.1)
175 ... return {"output": "Yes"}
176 >>> results = asyncio.run(
177 ... aevaluate(
178 ... apredict,
179 ... data=dataset_name,
180 ... evaluators=[accuracy],
181 ... summary_evaluators=[precision],
182 ... experiment_prefix="My Experiment",
183 ... description="Evaluate the accuracy of the model asynchronously.",
184 ... metadata={
185 ... "my-prompt-version": "abcd-1234",
186 ... },
187 ... )
188 ... ) # doctest: +ELLIPSIS
189 View the evaluation results for experiment:...
191 Evaluating over only a subset of the examples using an async generator:
193 >>> async def example_generator():
194 ... examples = client.list_examples(dataset_name=dataset_name, limit=5)
195 ... for example in examples:
196 ... yield example
197 >>> results = asyncio.run(
198 ... aevaluate(
199 ... apredict,
200 ... data=example_generator(),
201 ... evaluators=[accuracy],
202 ... summary_evaluators=[precision],
203 ... experiment_prefix="My Subset Experiment",
204 ... description="Evaluate a subset of examples asynchronously.",
205 ... )
206 ... ) # doctest: +ELLIPSIS
207 View the evaluation results for experiment:...
209 Streaming each prediction to more easily + eagerly debug.
211 >>> results = asyncio.run(
212 ... aevaluate(
213 ... apredict,
214 ... data=dataset_name,
215 ... evaluators=[accuracy],
216 ... summary_evaluators=[precision],
217 ... experiment_prefix="My Streaming Experiment",
218 ... description="Streaming predictions for debugging.",
219 ... blocking=False,
220 ... )
221 ... ) # doctest: +ELLIPSIS
222 View the evaluation results for experiment:...
224 >>> async def aenumerate(iterable):
225 ... async for elem in iterable:
226 ... print(elem)
227 >>> asyncio.run(aenumerate(results))
229 Running without concurrency:
231 >>> results = asyncio.run(
232 ... aevaluate(
233 ... apredict,
234 ... data=dataset_name,
235 ... evaluators=[accuracy],
236 ... summary_evaluators=[precision],
237 ... experiment_prefix="My Experiment Without Concurrency",
238 ... description="This was run without concurrency.",
239 ... max_concurrency=0,
240 ... )
241 ... ) # doctest: +ELLIPSIS
242 View the evaluation results for experiment:...
244 Using Async evaluators:
246 >>> async def helpfulness(run: Run, example: Example):
247 ... # Row-level evaluator for helpfulness.
248 ... await asyncio.sleep(5) # Replace with your LLM API call
249 ... return {"score": run.outputs["output"] == "Yes"}
251 >>> results = asyncio.run(
252 ... aevaluate(
253 ... apredict,
254 ... data=dataset_name,
255 ... evaluators=[helpfulness],
256 ... summary_evaluators=[precision],
257 ... experiment_prefix="My Helpful Experiment",
258 ... description="Applying async evaluators example.",
259 ... )
260 ... ) # doctest: +ELLIPSIS
261 View the evaluation results for experiment:...
264 !!! warning "Behavior changed in `langsmith` 0.2.0"
266 'max_concurrency' default updated from None (no limit on concurrency)
267 to 0 (no concurrency at all).
268 """ # noqa: E501
269 if isinstance(target, (str, uuid.UUID, schemas.TracerSession)):
270 invalid_args = {
271 "num_repetitions": num_repetitions > 1,
272 "experiment": bool(experiment),
273 "upload_results": not upload_results,
274 "experiment_prefix": bool(experiment_prefix),
275 "data": bool(data),
276 }
277 if any(invalid_args.values()):
278 msg = (
279 f"Received invalid arguments. "
280 f"{tuple(k for k, v in invalid_args.items() if v)} should not be "
281 f"specified when target is an existing experiment."
282 )
283 raise ValueError(msg)
284 target_id = target if isinstance(target, (str, uuid.UUID)) else target.id
285 logger.debug(f"Running evaluation over existing experiment {target_id}...")
286 return await aevaluate_existing(
287 target,
288 evaluators=evaluators,
289 summary_evaluators=summary_evaluators,
290 metadata=metadata,
291 max_concurrency=max_concurrency,
292 client=client,
293 blocking=blocking,
294 **kwargs,
295 )
296 elif isinstance(target, (list, tuple)):
297 msg = (
298 "Running a comparison of two existing experiments asynchronously is not "
299 "currently supported. Please use the `evaluate()` method instead and make "
300 "sure that your evaluators are defined as synchronous functions."
301 )
302 raise ValueError(msg)
303 elif kwargs:
304 msg = (
305 f"Received unsupported arguments {kwargs}. These arguments are not "
306 f"supported when creating a new experiment."
307 )
308 raise ValueError(msg)
309 elif not data:
310 msg = "Must specify 'data' when running evaluations over a target function."
311 raise ValueError(msg)
312 elif experiment and experiment_prefix:
313 msg = (
314 "Expected at most one of 'experiment' or 'experiment_prefix',"
315 " but both were provided. "
316 f"Got: experiment={experiment}, experiment_prefix={experiment_prefix}"
317 )
318 raise ValueError(msg)
319 else:
320 if not upload_results:
321 _warn_once("'upload_results' parameter is in beta.")
322 logger.debug(f"Running evaluation over target system {target}...")
323 return await _aevaluate(
324 target,
325 data=data,
326 evaluators=evaluators,
327 summary_evaluators=summary_evaluators,
328 metadata=metadata,
329 experiment_prefix=experiment_prefix,
330 description=description,
331 max_concurrency=max_concurrency,
332 num_repetitions=num_repetitions,
333 client=client,
334 blocking=blocking,
335 experiment=experiment,
336 upload_results=upload_results,
337 error_handling=error_handling,
338 )
341async def aevaluate_existing(
342 experiment: Union[str, uuid.UUID, schemas.TracerSession],
343 /,
344 evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None,
345 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
346 metadata: Optional[dict] = None,
347 max_concurrency: Optional[int] = 0,
348 client: Optional[langsmith.Client] = None,
349 load_nested: bool = False,
350 blocking: bool = True,
351) -> AsyncExperimentResults:
352 r"""Evaluate existing experiment runs asynchronously.
354 Args:
355 experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate.
356 evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation.
357 summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators
358 to apply over the entire dataset.
359 metadata (Optional[dict]): Optional metadata to include in the evaluation results.
360 max_concurrency (int | None): The maximum number of concurrent
361 evaluations to run.
363 If `None` then no limit is set. If `0` then no concurrency.
364 client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation.
365 load_nested: Whether to load all child runs for the experiment.
367 Default is to only load the top-level root runs.
368 blocking (bool): Whether to block until evaluation is complete.
370 Returns:
371 An async iterator over the experiment results.
373 Examples:
374 Define your evaluators
376 >>> from typing import Sequence
377 >>> from langsmith.schemas import Example, Run
378 >>> def accuracy(run: Run, example: Example):
379 ... # Row-level evaluator for accuracy.
380 ... pred = run.outputs["output"]
381 ... expected = example.outputs["answer"]
382 ... return {"score": expected.lower() == pred.lower()}
383 >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
384 ... # Experiment-level evaluator for precision.
385 ... # TP / (TP + FP)
386 ... predictions = [run.outputs["output"].lower() for run in runs]
387 ... expected = [example.outputs["answer"].lower() for example in examples]
388 ... # yes and no are the only possible answers
389 ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
390 ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
391 ... return {"score": tp / (tp + fp)}
393 Load the experiment and run the evaluation.
395 >>> import asyncio
396 >>> import uuid
397 >>> from langsmith import Client, aevaluate, aevaluate_existing
398 >>> client = Client()
399 >>> dataset_name = "__doctest_aevaluate_existing_" + uuid.uuid4().hex[:8]
400 >>> dataset = client.create_dataset(dataset_name)
401 >>> example = client.create_example(
402 ... inputs={"question": "What is 2+2?"},
403 ... outputs={"answer": "4"},
404 ... dataset_id=dataset.id,
405 ... )
406 >>> async def apredict(inputs: dict) -> dict:
407 ... await asyncio.sleep(0.001)
408 ... return {"output": "4"}
409 >>> results = asyncio.run(
410 ... aevaluate(
411 ... apredict, data=dataset_name, experiment_prefix="doctest_experiment"
412 ... )
413 ... ) # doctest: +ELLIPSIS
414 View the evaluation results for experiment:...
415 >>> experiment_id = results.experiment_name
416 >>> # Consume all results to ensure evaluation is complete
417 >>> async def consume_results():
418 ... result_list = [r async for r in results]
419 ... return len(result_list) > 0
420 >>> asyncio.run(consume_results())
421 True
422 >>> import time
423 >>> time.sleep(3)
424 >>> results = asyncio.run(
425 ... aevaluate_existing(
426 ... experiment_id,
427 ... evaluators=[accuracy],
428 ... summary_evaluators=[precision],
429 ... )
430 ... ) # doctest: +ELLIPSIS
431 View the evaluation results for experiment:...
432 >>> client.delete_dataset(dataset_id=dataset.id)
435 """ # noqa: E501
436 client = client or run_trees.get_cached_client()
437 project = (
438 experiment
439 if isinstance(experiment, schemas.TracerSession)
440 else (await aitertools.aio_to_thread(_load_experiment, experiment, client))
441 )
442 runs = await aitertools.aio_to_thread(
443 _load_traces, experiment, client, load_nested=load_nested
444 )
445 data_map = await aitertools.aio_to_thread(_load_examples_map, client, project)
446 data = [data_map[run.reference_example_id] for run in runs]
447 return await _aevaluate(
448 runs,
449 data=data,
450 evaluators=evaluators,
451 summary_evaluators=summary_evaluators,
452 metadata=metadata,
453 max_concurrency=max_concurrency,
454 client=client,
455 blocking=blocking,
456 experiment=project,
457 )
460async def _aevaluate(
461 target: Union[ATARGET_T, AsyncIterable[dict], Iterable[schemas.Run], Runnable],
462 /,
463 data: Union[DATA_T, AsyncIterable[schemas.Example]],
464 evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None,
465 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
466 metadata: Optional[dict] = None,
467 experiment_prefix: Optional[str] = None,
468 description: Optional[str] = None,
469 max_concurrency: Optional[int] = None,
470 num_repetitions: int = 1,
471 client: Optional[langsmith.Client] = None,
472 blocking: bool = True,
473 experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
474 upload_results: bool = True,
475 error_handling: Literal["log", "ignore"] = "log",
476) -> AsyncExperimentResults:
477 is_async_target = (
478 asyncio.iscoroutinefunction(target)
479 or (hasattr(target, "__aiter__") and asyncio.iscoroutine(target.__aiter__()))
480 or _is_langchain_runnable(target)
481 )
482 client = client or rt.get_cached_client()
483 runs = None if is_async_target else cast(Iterable[schemas.Run], target)
484 experiment_, runs = await aitertools.aio_to_thread(
485 _resolve_experiment,
486 experiment,
487 runs,
488 client,
489 )
490 num_include_attachments = int(
491 _target_include_attachments(target)
492 ) + _evaluators_include_attachments(evaluators)
493 manager = await _AsyncExperimentManager(
494 data,
495 client=client,
496 metadata=metadata,
497 experiment=experiment_ or experiment_prefix,
498 description=description,
499 num_repetitions=num_repetitions,
500 runs=runs,
501 include_attachments=num_include_attachments > 0,
502 reuse_attachments=num_repetitions * num_include_attachments > 1,
503 upload_results=upload_results,
504 error_handling=error_handling,
505 ).astart()
506 cache_dir = ls_utils.get_cache_dir(None)
507 if cache_dir is not None:
508 dsid = await manager.get_dataset_id()
509 cache_path = pathlib.Path(cache_dir) / f"{dsid}.yaml"
510 else:
511 cache_path = None
512 with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
513 if is_async_target:
514 if evaluators:
515 # Run predictions and evaluations in a single pipeline
516 manager = await manager.awith_predictions_and_evaluators(
517 cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency
518 )
519 else:
520 manager = await manager.awith_predictions(
521 cast(ATARGET_T, target), max_concurrency=max_concurrency
522 )
523 if summary_evaluators:
524 manager = await manager.awith_summary_evaluators(summary_evaluators)
525 else:
526 if evaluators:
527 manager = await manager.awith_evaluators(
528 evaluators, max_concurrency=max_concurrency
529 )
530 if summary_evaluators:
531 manager = await manager.awith_summary_evaluators(summary_evaluators)
532 results = AsyncExperimentResults(manager)
533 if blocking:
534 await results.wait()
535 return results
538class _AsyncExperimentManager(_ExperimentManagerMixin):
539 """Manage the execution of experiments asynchronously.
541 Supports lazily running predictions and evaluations in parallel to facilitate
542 result streaming and early debugging.
544 Args:
545 data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR
546 a generator of examples.
547 runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment
548 predictions.
549 experiment (Optional[schemas.TracerSession]): The tracer session
550 associated with the experiment.
551 experiment_prefix (Optional[str]): The prefix for the experiment name.
552 description (Optional[str]): The description for the experiment.
553 metadata (Optional[dict]): Additional metadata for the experiment.
554 client (Optional[langsmith.Client]): The Langsmith client used for
555 the experiment.
556 evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation
557 sresults for the experiment.
558 summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
559 for the experiment.
560 num_repetitions (Optional[int], default=1): The number of repetitions for
561 the experiment.
562 include_attachments (Optional[bool], default=False): Whether to include
563 attachments. This is used for when we pull the examples for the experiment.
564 reuse_attachments (Optional[bool], default=False): Whether to reuse attachments
565 from examples. This is True if we need to reuse attachments across multiple
566 target/evaluator functions.
567 upload_results (Optional[bool], default=True): Whether to upload results
568 to Langsmith.
569 attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data
570 for attachments. Only used if we reuse attachments across multiple
571 target/evaluator functions.
572 error_handling (str, default="log"): How to handle individual run errors.
574 `'log'` will trace the runs with the error message as part of the
575 experiment, `'ignore'` will not count the run as part of the experiment at
576 all.
577 """
579 def __init__(
580 self,
581 data: Union[DATA_T, AsyncIterable[schemas.Example]],
582 /,
583 experiment: Optional[Union[schemas.TracerSession, str]] = None,
584 metadata: Optional[dict] = None,
585 runs: Optional[Union[Iterable[schemas.Run], AsyncIterable[schemas.Run]]] = None,
586 client: Optional[langsmith.Client] = None,
587 evaluation_results: Optional[AsyncIterable[EvaluationResults]] = None,
588 summary_results: Optional[AsyncIterable[EvaluationResults]] = None,
589 description: Optional[str] = None,
590 num_repetitions: int = 1,
591 include_attachments: bool = False,
592 reuse_attachments: bool = False,
593 upload_results: bool = True,
594 attachment_raw_data_dict: Optional[dict] = None,
595 error_handling: Literal["log", "ignore"] = "log",
596 ):
597 super().__init__(
598 experiment=experiment,
599 metadata=metadata,
600 client=client,
601 description=description,
602 )
603 self._data = data
604 self._examples: Optional[AsyncIterable[schemas.Example]] = None
605 self._runs = (
606 aitertools.ensure_async_iterator(runs) if runs is not None else None
607 )
608 self._evaluation_results = evaluation_results
609 self._summary_results = summary_results
610 self._num_repetitions = num_repetitions
611 self._include_attachments = include_attachments
612 self._reuse_attachments = reuse_attachments
613 self._upload_results = upload_results
614 self._attachment_raw_data_dict = attachment_raw_data_dict
615 self._error_handling = error_handling
617 def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example:
618 """Reset attachment readers for an example.
620 This is only in the case that an attachment is going to be used by more
621 than 1 callable (target + evaluators). In that case we keep a single copy
622 of the attachment data in self._attachment_raw_data_dict, and create
623 readers from that data. This makes it so that we don't have to keep
624 copies of the same data in memory, instead we can just create readers
625 from the same data.
626 """
627 if not hasattr(example, "attachments") or not example.attachments:
628 return example
630 new_attachments: dict[str, schemas.AttachmentInfo] = {}
631 for name, attachment in example.attachments.items():
632 if (
633 self._attachment_raw_data_dict is not None
634 and str(example.id) + name in self._attachment_raw_data_dict
635 ):
636 new_attachments[name] = {
637 "presigned_url": attachment["presigned_url"],
638 "reader": io.BytesIO(
639 self._attachment_raw_data_dict[str(example.id) + name]
640 ),
641 "mime_type": attachment["mime_type"],
642 }
643 else:
644 new_attachments[name] = attachment
646 # Create a new Example instance with the updated attachments
647 return schemas.Example(
648 id=example.id,
649 created_at=example.created_at,
650 dataset_id=example.dataset_id,
651 inputs=example.inputs,
652 outputs=example.outputs,
653 metadata=example.metadata,
654 modified_at=example.modified_at,
655 source_run_id=example.source_run_id,
656 attachments=new_attachments,
657 _host_url=example._host_url,
658 _tenant_id=example._tenant_id,
659 )
661 async def aget_examples(self) -> AsyncIterator[schemas.Example]:
662 if self._examples is None:
663 self._examples = _aresolve_data(
664 self._data,
665 client=self.client,
666 include_attachments=self._include_attachments,
667 )
668 if self._reuse_attachments and self._attachment_raw_data_dict is None:
669 examples_copy, self._examples = aitertools.atee(self._examples)
670 self._attachment_raw_data_dict = {
671 str(e.id) + name: value["reader"].read()
672 async for e in examples_copy
673 for name, value in (e.attachments or {}).items()
674 }
675 if self._num_repetitions > 1:
676 examples_list = [example async for example in self._examples]
677 self._examples = async_chain_from_iterable(
678 [
679 async_iter_from_list(
680 [
681 self._reset_example_attachments(example)
682 for example in examples_list
683 ]
684 )
685 for _ in range(self._num_repetitions)
686 ]
687 )
689 self._examples, examples_iter = aitertools.atee(
690 aitertools.ensure_async_iterator(self._examples), 2, lock=asyncio.Lock()
691 )
692 return examples_iter
694 async def get_dataset_id(self) -> str:
695 if self._experiment is None or not getattr(
696 self._experiment, "reference_dataset_id", None
697 ):
698 example = await aitertools.py_anext(await self.aget_examples())
699 if example is None:
700 raise ValueError("No examples found in the dataset.")
701 return str(example.dataset_id)
702 return str(self._experiment.reference_dataset_id)
704 async def aget_runs(self) -> AsyncIterator[schemas.Run]:
705 if self._runs is None:
706 raise ValueError("Runs not loaded yet.")
707 self._runs, runs = aitertools.atee(
708 aitertools.ensure_async_iterator(self._runs), 2, lock=asyncio.Lock()
709 )
710 async for run in runs:
711 yield run
713 async def aget_evaluation_results(self) -> AsyncIterator[EvaluationResults]:
714 if self._evaluation_results is None:
715 async for _ in await self.aget_examples():
716 yield {"results": []}
717 else:
718 self._evaluation_results, evaluation_results = aitertools.atee(
719 aitertools.ensure_async_iterator(self._evaluation_results),
720 2,
721 lock=asyncio.Lock(),
722 )
723 async for result in evaluation_results:
724 yield result
726 async def astart(self) -> _AsyncExperimentManager:
727 try:
728 first_example = await aitertools.py_anext(await self.aget_examples())
729 except StopAsyncIteration:
730 raise ValueError(
731 "No examples found in the dataset. "
732 "Please ensure the data provided to aevaluate is not empty."
733 )
734 if not first_example:
735 raise ValueError(
736 "No examples found in the dataset."
737 "Please ensure the data provided to aevaluate is not empty."
738 )
739 project = self._get_project(first_example) if self._upload_results else None
740 self._print_experiment_start(project, first_example)
741 self._metadata["num_repetitions"] = self._num_repetitions
742 return self._copy(
743 await self.aget_examples(),
744 experiment=project,
745 )
747 def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example:
748 new_attachments: dict[str, schemas.AttachmentInfo] = {}
749 for name, attachment in (example.attachments or {}).items():
750 if (
751 self._attachment_raw_data_dict is not None
752 and str(example.id) + name in self._attachment_raw_data_dict
753 ):
754 reader = io.BytesIO(
755 self._attachment_raw_data_dict[str(example.id) + name]
756 )
757 new_attachments[name] = {
758 "presigned_url": attachment["presigned_url"],
759 "reader": reader,
760 "mime_type": attachment["mime_type"],
761 }
762 else:
763 new_attachments[name] = attachment
765 return schemas.Example(
766 id=example.id,
767 created_at=example.created_at,
768 dataset_id=example.dataset_id,
769 inputs=example.inputs,
770 outputs=example.outputs,
771 metadata=example.metadata,
772 modified_at=example.modified_at,
773 source_run_id=example.source_run_id,
774 attachments=new_attachments,
775 _host_url=example._host_url,
776 _tenant_id=example._tenant_id,
777 )
779 async def awith_predictions_and_evaluators(
780 self,
781 target: ATARGET_T,
782 evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
783 /,
784 max_concurrency: Optional[int] = None,
785 ) -> _AsyncExperimentManager:
786 """Run predictions and evaluations in a single pipeline.
788 This allows evaluators to process results as soon as they're available from
789 the target function, rather than waiting for all predictions to complete first.
790 """
791 evaluators = _resolve_evaluators(evaluators)
793 if not hasattr(self, "_evaluation_feedback_executor"):
794 self._evaluation_feedback_executor = cf.ThreadPoolExecutor(max_workers=4)
796 traceable_target = _ensure_async_traceable(target)
798 async def process_example(example: schemas.Example):
799 # Yield the coroutine to be awaited later
800 pred = await _aforward(
801 traceable_target,
802 self._get_example_with_readers(example),
803 self.experiment_name,
804 self._metadata,
805 self.client,
806 _target_include_attachments(target),
807 self._error_handling,
808 )
809 example, run = pred["example"], pred["run"]
810 result = await self._arun_evaluators(
811 evaluators,
812 {
813 "run": run,
814 "example": example,
815 "evaluation_results": {"results": []},
816 },
817 feedback_executor=self._evaluation_feedback_executor,
818 )
819 return result
821 async def process_examples():
822 """Create a single task per example.
824 That task is to run the target function and all the evaluators
825 sequentially.
826 """
827 async for example in await self.aget_examples():
828 yield process_example(example)
830 await self._aend()
832 # Run the per-example tasks with max-concurrency
833 # This guarantees that max_concurrency is the upper limit
834 # for the number of target/evaluators that can be run in parallel
835 experiment_results = aitertools.aiter_with_concurrency(
836 max_concurrency,
837 process_examples(),
838 _eager_consumption_timeout=0.001,
839 )
841 r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())
843 return self._copy(
844 (result["example"] async for result in r1),
845 runs=(result["run"] async for result in r2),
846 evaluation_results=(result["evaluation_results"] async for result in r3),
847 )
849 async def awith_predictions(
850 self,
851 target: ATARGET_T,
852 /,
853 max_concurrency: Optional[int] = None,
854 ) -> _AsyncExperimentManager:
855 _experiment_results = self._apredict(
856 target,
857 max_concurrency=max_concurrency,
858 include_attachments=_target_include_attachments(target),
859 )
860 r1, r2 = aitertools.atee(_experiment_results, 2, lock=asyncio.Lock())
861 return self._copy(
862 (pred["example"] async for pred in r1),
863 runs=(pred["run"] async for pred in r2),
864 )
866 async def awith_evaluators(
867 self,
868 evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
869 *,
870 max_concurrency: Optional[int] = None,
871 ) -> _AsyncExperimentManager:
872 evaluators = _resolve_evaluators(evaluators)
873 experiment_results = self._ascore(evaluators, max_concurrency=max_concurrency)
874 r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())
875 return self._copy(
876 (result["example"] async for result in r1),
877 runs=(result["run"] async for result in r2),
878 evaluation_results=(result["evaluation_results"] async for result in r3),
879 )
881 async def awith_summary_evaluators(
882 self,
883 summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
884 ) -> _AsyncExperimentManager:
885 wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators)
886 aggregate_feedback_gen = self._aapply_summary_evaluators(wrapped_evaluators)
887 return self._copy(
888 await self.aget_examples(),
889 runs=self.aget_runs(),
890 summary_results=aggregate_feedback_gen,
891 )
893 async def aget_results(self) -> AsyncIterator[ExperimentResultRow]:
894 async for run, example, evaluation_results in aitertools.async_zip(
895 self.aget_runs(), await self.aget_examples(), self.aget_evaluation_results()
896 ):
897 yield ExperimentResultRow(
898 run=run,
899 example=example,
900 evaluation_results=evaluation_results,
901 )
903 async def aget_summary_scores(self) -> dict[str, list[dict]]:
904 if self._summary_results is None:
905 return {"results": []}
906 return {
907 "results": [
908 res # type: ignore[misc]
909 async for results in self._summary_results
910 for res in results["results"]
911 ]
912 }
914 ## Private methods
916 async def _apredict(
917 self,
918 target: ATARGET_T,
919 /,
920 max_concurrency: Optional[int] = None,
921 include_attachments: bool = False,
922 ) -> AsyncIterator[_ForwardResults]:
923 fn = _ensure_async_traceable(target)
925 async def predict_all():
926 async for example in await self.aget_examples():
927 # Yield the coroutine to be awaited later
928 yield _aforward(
929 fn,
930 self._get_example_with_readers(example),
931 self.experiment_name,
932 self._metadata,
933 self.client,
934 include_attachments,
935 self._error_handling,
936 )
938 async for result in aitertools.aiter_with_concurrency(
939 max_concurrency, predict_all(), _eager_consumption_timeout=0.001
940 ):
941 yield result
943 await self._aend()
945 async def _ascore(
946 self,
947 evaluators: Sequence[RunEvaluator],
948 max_concurrency: Optional[int] = None,
949 ) -> AsyncIterator[ExperimentResultRow]:
950 with cf.ThreadPoolExecutor(max_workers=4) as feedback_executor:
952 async def score_all():
953 async for current_results in self.aget_results():
954 # Yield the coroutine to be awaited later in aiter_with_concurrency
955 yield self._arun_evaluators(
956 evaluators, current_results, feedback_executor=feedback_executor
957 )
959 async for result in aitertools.aiter_with_concurrency(
960 max_concurrency, score_all(), _eager_consumption_timeout=0.001
961 ):
962 yield result
964 async def _arun_evaluators(
965 self,
966 evaluators: Sequence[RunEvaluator],
967 current_results: ExperimentResultRow,
968 feedback_executor: cf.ThreadPoolExecutor,
969 ) -> ExperimentResultRow:
970 current_context = rh.get_tracing_context()
971 metadata = {
972 **(current_context["metadata"] or {}),
973 **{"experiment": self.experiment_name},
974 }
975 with rh.tracing_context(
976 **{
977 **current_context,
978 "project_name": "evaluators",
979 "metadata": metadata,
980 "enabled": "local" if not self._upload_results else True,
981 "client": self.client,
982 }
983 ):
984 run = current_results["run"]
985 example = current_results["example"]
986 eval_results = current_results["evaluation_results"]
988 async def _run_single_evaluator(evaluator: RunEvaluator):
989 evaluator_run_id = uuid.uuid4()
990 try:
991 evaluator_response = await evaluator.aevaluate_run( # type: ignore[call-arg]
992 run=run,
993 example=self._get_example_with_readers(example),
994 evaluator_run_id=evaluator_run_id,
995 )
996 selected_results = self.client._select_eval_results(
997 evaluator_response
998 )
1000 if self._upload_results:
1001 self.client._log_evaluation_feedback(
1002 evaluator_response, run=run, _executor=feedback_executor
1003 )
1004 return selected_results
1005 except Exception as e:
1006 try:
1007 feedback_keys = _extract_feedback_keys(evaluator)
1009 error_response = EvaluationResults(
1010 results=[
1011 EvaluationResult(
1012 key=key,
1013 source_run_id=evaluator_run_id,
1014 comment=repr(e),
1015 extra={"error": True},
1016 )
1017 for key in feedback_keys
1018 ]
1019 )
1020 selected_results = self.client._select_eval_results(
1021 error_response
1022 )
1023 if self._upload_results:
1024 self.client._log_evaluation_feedback(
1025 error_response, run=run, _executor=feedback_executor
1026 )
1027 return selected_results
1028 except Exception as e2:
1029 logger.debug(f"Error parsing feedback keys: {e2}")
1030 pass
1031 logger.error(
1032 f"Error running evaluator {repr(evaluator)} on"
1033 f" run {run.id}: {repr(e)}",
1034 exc_info=True,
1035 )
1037 all_results = []
1038 for evaluator in evaluators:
1039 all_results.append(await _run_single_evaluator(evaluator))
1041 for result in all_results:
1042 if result is not None:
1043 eval_results["results"].extend(result)
1044 return ExperimentResultRow(
1045 run=run,
1046 example=example,
1047 evaluation_results=eval_results,
1048 )
1050 async def _aapply_summary_evaluators(
1051 self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
1052 ) -> AsyncIterator[EvaluationResults]:
1053 runs, examples = [], []
1054 async_examples = aitertools.ensure_async_iterator(await self.aget_examples())
1055 async for run, example in aitertools.async_zip(
1056 self.aget_runs(), async_examples
1057 ):
1058 runs.append(run)
1059 examples.append(example)
1060 aggregate_feedback = []
1061 project_id = self._get_experiment().id if self._upload_results else None
1062 current_context = rh.get_tracing_context()
1063 metadata = {
1064 **(current_context["metadata"] or {}),
1065 **{
1066 "experiment": self.experiment_name,
1067 "experiment_id": project_id,
1068 },
1069 }
1070 with rh.tracing_context(
1071 **{
1072 **current_context,
1073 "project_name": "evaluators",
1074 "metadata": metadata,
1075 "enabled": "local" if not self._upload_results else True,
1076 "client": self.client,
1077 }
1078 ):
1079 for evaluator in summary_evaluators:
1080 try:
1081 summary_eval_result = evaluator(runs, examples)
1082 flattened_results = self.client._select_eval_results(
1083 summary_eval_result,
1084 fn_name=evaluator.__name__,
1085 )
1086 aggregate_feedback.extend(flattened_results)
1087 if self._upload_results:
1088 for result in flattened_results:
1089 feedback = result.dict(exclude={"target_run_id"})
1090 evaluator_info = feedback.pop("evaluator_info", None)
1091 await aitertools.aio_to_thread(
1092 self.client.create_feedback,
1093 **feedback,
1094 run_id=None,
1095 project_id=project_id,
1096 source_info=evaluator_info,
1097 )
1098 except Exception as e:
1099 logger.error(
1100 f"Error running summary evaluator {repr(evaluator)}: {e}",
1101 exc_info=True,
1102 )
1103 yield {"results": aggregate_feedback}
1105 async def _get_dataset_version(self) -> Optional[str]:
1106 modified_at = []
1107 async for example in await self.aget_examples():
1108 if example.modified_at:
1109 # Should always be defined in practice when fetched,
1110 # but the typing permits None
1111 modified_at.append(example.modified_at)
1113 max_modified_at = max(modified_at) if modified_at else None
1114 return max_modified_at.isoformat() if max_modified_at else None
1116 async def _get_dataset_splits(self) -> Optional[list[str]]:
1117 splits = set()
1118 async for example in await self.aget_examples():
1119 if (
1120 example.metadata
1121 and example.metadata.get("dataset_split")
1122 and isinstance(example.metadata["dataset_split"], list)
1123 ):
1124 for split in example.metadata["dataset_split"]:
1125 if isinstance(split, str):
1126 splits.add(split)
1127 else:
1128 splits.add("base")
1130 return list(splits)
1132 async def _aend(self) -> None:
1133 if not self._upload_results:
1134 return
1135 experiment = self._experiment
1136 if experiment is None:
1137 raise ValueError("Experiment not started yet.")
1139 project_metadata = self._get_experiment_metadata()
1140 project_metadata["dataset_version"] = await self._get_dataset_version()
1141 project_metadata["dataset_splits"] = await self._get_dataset_splits()
1142 self.client.update_project(
1143 experiment.id,
1144 metadata={
1145 **experiment.metadata,
1146 **project_metadata,
1147 },
1148 )
1150 def _copy(self, *args: Any, **kwargs: Any) -> _AsyncExperimentManager:
1151 default_args = (self._data,)
1152 default_kwargs = {
1153 "experiment": self._experiment,
1154 "metadata": self._metadata,
1155 "runs": self._runs,
1156 "client": self.client,
1157 "evaluation_results": self._evaluation_results,
1158 "summary_results": self._summary_results,
1159 "include_attachments": self._include_attachments,
1160 "reuse_attachments": self._reuse_attachments,
1161 "upload_results": self._upload_results,
1162 "attachment_raw_data_dict": self._attachment_raw_data_dict,
1163 "error_handling": self._error_handling,
1164 }
1165 full_args = list(args) + list(default_args[len(args) :])
1166 full_kwargs = {**default_kwargs, **kwargs}
1167 return self.__class__(*full_args, **full_kwargs)
1170class AsyncExperimentResults:
1171 def __init__(
1172 self,
1173 experiment_manager: _AsyncExperimentManager,
1174 ):
1175 self._manager = experiment_manager
1176 self._results: list[ExperimentResultRow] = []
1177 self._lock = asyncio.Lock()
1178 self._task = asyncio.create_task(self._process_data(self._manager))
1179 self._processed_count = 0
1181 @property
1182 def experiment_name(self) -> str:
1183 return self._manager.experiment_name
1185 def __aiter__(self) -> AsyncIterator[ExperimentResultRow]:
1186 return self
1188 async def __anext__(self) -> ExperimentResultRow:
1189 async def _wait_until_index(index: int) -> None:
1190 while self._processed_count < index:
1191 await asyncio.sleep(0.05)
1193 while True:
1194 async with self._lock:
1195 if self._processed_count < len(self._results):
1196 result = self._results[self._processed_count]
1197 self._processed_count += 1
1198 return result
1199 elif self._task.done():
1200 raise StopAsyncIteration
1202 await asyncio.shield(
1203 asyncio.wait_for(_wait_until_index(len(self._results)), timeout=None)
1204 )
1206 async def _process_data(self, manager: _AsyncExperimentManager) -> None:
1207 tqdm = _load_tqdm()
1208 async for item in tqdm(manager.aget_results()):
1209 async with self._lock:
1210 self._results.append(item)
1211 summary_scores = await manager.aget_summary_scores()
1212 async with self._lock:
1213 self._summary_results = summary_scores
1215 def to_pandas(
1216 self, start: Optional[int] = 0, end: Optional[int] = None
1217 ) -> DataFrame:
1218 return _to_pandas(self._results, start=start, end=end)
1220 def _repr_html_(self) -> str:
1221 import importlib.util
1223 if self._results and importlib.util.find_spec("pandas"):
1224 df = self.to_pandas(0, 5)
1225 return df._repr_html_() # type: ignore[operator]
1226 else:
1227 return self.__repr__()
1229 def __len__(self) -> int:
1230 return len(self._results)
1232 def __repr__(self) -> str:
1233 return f"<AsyncExperimentResults {self.experiment_name}>"
1235 async def wait(self) -> None:
1236 await self._task
1239async def _aforward(
1240 fn: rh.SupportsLangsmithExtra[[dict], Awaitable],
1241 example: schemas.Example,
1242 experiment_name: str,
1243 metadata: dict,
1244 client: langsmith.Client,
1245 include_attachments: bool = False,
1246 error_handling: Literal["log", "ignore"] = "log",
1247) -> _ForwardResults:
1248 run: Optional[schemas.RunBase] = None
1250 def _get_run(r: run_trees.RunTree) -> None:
1251 nonlocal run
1252 run = r
1254 def _set_reference_example_id(r: rt.RunTree) -> None:
1255 r.reference_example_id = example.id
1257 langsmith_extra = rh.LangSmithExtra(
1258 on_end=_get_run,
1259 project_name=experiment_name,
1260 metadata={
1261 **metadata,
1262 "example_version": (example.modified_at or example.created_at).isoformat(),
1263 },
1264 client=client,
1265 )
1266 if error_handling == "log":
1267 langsmith_extra["reference_example_id"] = example.id
1268 elif error_handling == "ignore":
1269 langsmith_extra["_on_success"] = _set_reference_example_id
1270 else:
1271 raise ValueError(f"Unrecognized error_handling value: {error_handling=}")
1273 with rh.tracing_context(enabled=True):
1274 try:
1275 arg_names = _get_target_args(fn)
1276 args = [getattr(example, argn) for argn in arg_names]
1277 await fn(*args, langsmith_extra=langsmith_extra)
1278 except Exception as e:
1279 logger.error(
1280 f"Error running target function: {e}", exc_info=True, stacklevel=1
1281 )
1282 return _ForwardResults(
1283 run=cast(schemas.Run, run),
1284 example=example,
1285 )
1288def _ensure_async_traceable(
1289 target: ATARGET_T,
1290) -> rh.SupportsLangsmithExtra[[dict], Awaitable]:
1291 if not asyncio.iscoroutinefunction(target) and not _is_langchain_runnable(target):
1292 if callable(target):
1293 raise ValueError(
1294 "Target must be an async function. For sync functions, use evaluate."
1295 " Example usage:\n\n"
1296 "async def predict(inputs: dict) -> dict:\n"
1297 " # do work, like chain.invoke(inputs)\n"
1298 " return {...}\n"
1299 "await aevaluate(predict, ...)"
1300 )
1301 else:
1302 raise ValueError(
1303 "Target must be a callable async function. "
1304 "Received a non-callable object. Example usage:\n\n"
1305 "async def predict(inputs: dict) -> dict:\n"
1306 " # do work, like chain.invoke(inputs)\n"
1307 " return {...}\n"
1308 "await aevaluate(predict, ...)"
1309 )
1310 if rh.is_traceable_function(target):
1311 return target # type: ignore
1312 else:
1313 if _is_langchain_runnable(target):
1314 target = target.ainvoke # type: ignore[union-attr]
1315 return rh.traceable(name="AsyncTarget")(target) # type: ignore[arg-type]
1318def _aresolve_data(
1319 data: Union[DATA_T, AsyncIterable[schemas.Example]],
1320 *,
1321 client: langsmith.Client,
1322 include_attachments: bool = False,
1323) -> AsyncIterator[schemas.Example]:
1324 """Return the examples for the given dataset."""
1325 if isinstance(data, AsyncIterable):
1326 return aitertools.ensure_async_iterator(data)
1327 return aitertools.ensure_async_iterator(
1328 _resolve_data(data, client=client, include_attachments=include_attachments)
1329 )
1332T = TypeVar("T")
1335async def async_chain_from_iterable(
1336 iterable: Iterable[AsyncIterable[T]],
1337) -> AsyncIterator[T]:
1338 """Chain multiple async iterables."""
1339 for sub_iterable in iterable:
1340 async for item in sub_iterable:
1341 yield item
1344async def async_iter_from_list(
1345 examples: list[schemas.Example],
1346) -> AsyncIterable[schemas.Example]:
1347 """Convert a list of examples to an async iterable."""
1348 for example in examples:
1349 yield example