Coverage for langsmith/evaluation/_runner.py: 48%

770 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""V2 Evaluation Interface.""" 

2 

3from __future__ import annotations 

4 

5import ast 

6import collections 

7import concurrent.futures as cf 

8import functools 

9import inspect 

10import io 

11import itertools 

12import logging 

13import pathlib 

14import queue 

15import random 

16import textwrap 

17import threading 

18import uuid 

19from collections.abc import Awaitable, Generator, Iterable, Iterator, Sequence 

20from contextvars import copy_context 

21from typing import ( 

22 TYPE_CHECKING, 

23 Any, 

24 Callable, 

25 Literal, 

26 Optional, 

27 TypeVar, 

28 Union, 

29 cast, 

30) 

31 

32from typing_extensions import TypedDict, overload 

33 

34import langsmith 

35from langsmith import env as ls_env 

36from langsmith import run_helpers as rh 

37from langsmith import run_trees as rt 

38from langsmith import schemas 

39from langsmith import utils as ls_utils 

40from langsmith._internal._beta_decorator import _warn_once 

41from langsmith.evaluation.evaluator import ( 

42 SUMMARY_EVALUATOR_T, 

43 ComparisonEvaluationResult, 

44 DynamicComparisonRunEvaluator, 

45 DynamicRunEvaluator, 

46 EvaluationResult, 

47 EvaluationResults, 

48 RunEvaluator, 

49 _normalize_summary_evaluator, 

50 comparison_evaluator, 

51 run_evaluator, 

52) 

53from langsmith.evaluation.integrations import LangChainStringEvaluator 

54 

55if TYPE_CHECKING: 

56 import pandas as pd 

57 from langchain_core.runnables import Runnable 

58 

59 DataFrame = pd.DataFrame 

60else: 

61 DataFrame = Any 

62logger = logging.getLogger(__name__) 

63 

64TARGET_T = Union[Callable[[dict], dict], Callable[[dict, dict], dict]] 

65# Data format: dataset-name, dataset_id, or examples 

66DATA_T = Union[str, uuid.UUID, Iterable[schemas.Example], schemas.Dataset] 

67# Summary evaluator runs over the whole dataset 

68# and reports aggregate metric(s) 

69# Row-level evaluator 

70EVALUATOR_T = Union[ 

71 RunEvaluator, 

72 Callable[ 

73 [schemas.Run, Optional[schemas.Example]], 

74 Union[EvaluationResult, EvaluationResults], 

75 ], 

76 Callable[..., Union[dict, EvaluationResults, EvaluationResult]], 

77] 

78AEVALUATOR_T = Union[ 

79 Callable[ 

80 [schemas.Run, Optional[schemas.Example]], 

81 Awaitable[Union[EvaluationResult, EvaluationResults]], 

82 ], 

83] 

84EXPERIMENT_T = Union[str, uuid.UUID, schemas.TracerSession] 

85 

86 

87@overload 

88def evaluate( 

89 target: Union[TARGET_T, Runnable, EXPERIMENT_T], 

90 /, 

91 data: Optional[DATA_T] = None, 

92 evaluators: Optional[Sequence[EVALUATOR_T]] = None, 

93 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None, 

94 metadata: Optional[dict] = None, 

95 experiment_prefix: Optional[str] = None, 

96 description: Optional[str] = None, 

97 max_concurrency: Optional[int] = 0, 

98 num_repetitions: int = 1, 

99 client: Optional[langsmith.Client] = None, 

100 blocking: bool = True, 

101 experiment: Optional[EXPERIMENT_T] = None, 

102 upload_results: bool = True, 

103 **kwargs: Any, 

104) -> ExperimentResults: ... 

105 

106 

107@overload 

108def evaluate( 

109 target: Union[tuple[EXPERIMENT_T, EXPERIMENT_T]], 

110 /, 

111 data: Optional[DATA_T] = None, 

112 evaluators: Optional[Sequence[COMPARATIVE_EVALUATOR_T]] = None, 

113 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None, 

114 metadata: Optional[dict] = None, 

115 experiment_prefix: Optional[str] = None, 

116 description: Optional[str] = None, 

117 max_concurrency: Optional[int] = 0, 

118 num_repetitions: int = 1, 

119 client: Optional[langsmith.Client] = None, 

120 blocking: bool = True, 

121 experiment: Optional[EXPERIMENT_T] = None, 

122 upload_results: bool = True, 

123 **kwargs: Any, 

124) -> ComparativeExperimentResults: ... 

125 

126 

127def evaluate( 

128 target: Union[TARGET_T, Runnable, EXPERIMENT_T, tuple[EXPERIMENT_T, EXPERIMENT_T]], 

129 /, 

130 data: Optional[DATA_T] = None, 

131 evaluators: Optional[ 

132 Union[Sequence[EVALUATOR_T], Sequence[COMPARATIVE_EVALUATOR_T]] 

133 ] = None, 

134 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None, 

135 metadata: Optional[dict] = None, 

136 experiment_prefix: Optional[str] = None, 

137 description: Optional[str] = None, 

138 max_concurrency: Optional[int] = 0, 

139 num_repetitions: int = 1, 

140 client: Optional[langsmith.Client] = None, 

141 blocking: bool = True, 

142 experiment: Optional[EXPERIMENT_T] = None, 

143 upload_results: bool = True, 

144 error_handling: Literal["log", "ignore"] = "log", 

145 **kwargs: Any, 

146) -> Union[ExperimentResults, ComparativeExperimentResults]: 

147 r"""Evaluate a target system on a given dataset. 

148 

149 Args: 

150 target (TARGET_T | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]): 

151 The target system or experiment(s) to evaluate. 

152 

153 Can be a function that takes a dict and returns a `dict`, a langchain `Runnable`, an 

154 existing experiment ID, or a two-tuple of experiment IDs. 

155 data (DATA_T): The dataset to evaluate on. 

156 

157 Can be a dataset name, a list of examples, or a generator of examples. 

158 evaluators (Sequence[EVALUATOR_T] | Sequence[COMPARATIVE_EVALUATOR_T] | None): 

159 A list of evaluators to run on each example. The evaluator signature 

160 depends on the target type. 

161 summary_evaluators (Sequence[SUMMARY_EVALUATOR_T] | None): A list of summary 

162 evaluators to run on the entire dataset. 

163 

164 Should not be specified if comparing two existing experiments. 

165 metadata (dict | None): Metadata to attach to the experiment. 

166 experiment_prefix (str | None): A prefix to provide for your experiment name. 

167 description (str | None): A free-form text description for the experiment. 

168 max_concurrency (int | None): The maximum number of concurrent 

169 evaluations to run. 

170 

171 If `None` then no limit is set. If `0` then no concurrency. 

172 client (langsmith.Client | None): The LangSmith client to use. 

173 blocking (bool): Whether to block until the evaluation is complete. 

174 num_repetitions (int): The number of times to run the evaluation. 

175 Each item in the dataset will be run and evaluated this many times. 

176 experiment (schemas.TracerSession | None): An existing experiment to 

177 extend. 

178 

179 If provided, `experiment_prefix` is ignored. 

180 

181 For advanced usage only. Should not be specified if target is an existing 

182 experiment or two-tuple fo experiments. 

183 error_handling (str, default="log"): How to handle individual run errors. 

184 

185 `'log'` will trace the runs with the error message as part of the 

186 experiment, `'ignore'` will not count the run as part of the experiment at 

187 all. 

188 

189 Returns: 

190 ExperimentResults: If target is a function, `Runnable`, or existing experiment. 

191 ComparativeExperimentResults: If target is a two-tuple of existing experiments. 

192 

193 Examples: 

194 Prepare the dataset: 

195 

196 >>> from typing import Sequence 

197 >>> from langsmith import Client 

198 >>> from langsmith.evaluation import evaluate 

199 >>> from langsmith.schemas import Example, Run 

200 >>> client = Client() 

201 >>> dataset = client.clone_public_dataset( 

202 ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d" 

203 ... ) 

204 >>> dataset_name = "Evaluate Examples" 

205 

206 Basic usage: 

207 

208 >>> def accuracy(run: Run, example: Example): 

209 ... # Row-level evaluator for accuracy. 

210 ... pred = run.outputs["output"] 

211 ... expected = example.outputs["answer"] 

212 ... return {"score": expected.lower() == pred.lower()} 

213 >>> def precision(runs: Sequence[Run], examples: Sequence[Example]): 

214 ... # Experiment-level evaluator for precision. 

215 ... # TP / (TP + FP) 

216 ... predictions = [run.outputs["output"].lower() for run in runs] 

217 ... expected = [example.outputs["answer"].lower() for example in examples] 

218 ... # yes and no are the only possible answers 

219 ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) 

220 ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) 

221 ... return {"score": tp / (tp + fp)} 

222 >>> def predict(inputs: dict) -> dict: 

223 ... # This can be any function or just an API call to your app. 

224 ... return {"output": "Yes"} 

225 >>> results = evaluate( 

226 ... predict, 

227 ... data=dataset_name, 

228 ... evaluators=[accuracy], 

229 ... summary_evaluators=[precision], 

230 ... experiment_prefix="My Experiment", 

231 ... description="Evaluating the accuracy of a simple prediction model.", 

232 ... metadata={ 

233 ... "my-prompt-version": "abcd-1234", 

234 ... }, 

235 ... ) # doctest: +ELLIPSIS 

236 View the evaluation results for experiment:... 

237 

238 Evaluating over only a subset of the examples 

239 

240 >>> experiment_name = results.experiment_name 

241 >>> examples = client.list_examples(dataset_name=dataset_name, limit=5) 

242 >>> results = evaluate( 

243 ... predict, 

244 ... data=examples, 

245 ... evaluators=[accuracy], 

246 ... summary_evaluators=[precision], 

247 ... experiment_prefix="My Experiment", 

248 ... description="Just testing a subset synchronously.", 

249 ... ) # doctest: +ELLIPSIS 

250 View the evaluation results for experiment:... 

251 

252 Streaming each prediction to more easily + eagerly debug. 

253 

254 >>> results = evaluate( 

255 ... predict, 

256 ... data=dataset_name, 

257 ... evaluators=[accuracy], 

258 ... summary_evaluators=[precision], 

259 ... description="I don't even have to block!", 

260 ... blocking=False, 

261 ... ) # doctest: +ELLIPSIS 

262 View the evaluation results for experiment:... 

263 >>> for i, result in enumerate(results): # doctest: +ELLIPSIS 

264 ... pass 

265 

266 Using the `evaluate` API with an off-the-shelf LangChain evaluator: 

267 

268 >>> from langsmith.evaluation import LangChainStringEvaluator # doctest: +SKIP 

269 >>> from langchain_openai import ChatOpenAI # doctest: +SKIP 

270 >>> def prepare_criteria_data(run: Run, example: Example): # doctest: +SKIP 

271 ... return { 

272 ... "prediction": run.outputs["output"], 

273 ... "reference": example.outputs["answer"], 

274 ... "input": str(example.inputs), 

275 ... } 

276 >>> results = evaluate( # doctest: +SKIP 

277 ... predict, 

278 ... data=dataset_name, 

279 ... evaluators=[ 

280 ... accuracy, 

281 ... LangChainStringEvaluator("embedding_distance"), 

282 ... LangChainStringEvaluator( 

283 ... "labeled_criteria", 

284 ... config={ 

285 ... "criteria": { 

286 ... "usefulness": "The prediction is useful if it is correct" 

287 ... " and/or asks a useful followup question." 

288 ... }, 

289 ... "llm": ChatOpenAI(model="gpt-4o"), 

290 ... }, 

291 ... prepare_data=prepare_criteria_data, 

292 ... ), 

293 ... ], 

294 ... description="Evaluating with off-the-shelf LangChain evaluators.", 

295 ... summary_evaluators=[precision], 

296 ... ) 

297 View the evaluation results for experiment:... # doctest: +SKIP 

298 

299 Evaluating a LangChain object: 

300 

301 >>> from langchain_core.runnables import chain as as_runnable 

302 >>> @as_runnable 

303 ... def nested_predict(inputs): 

304 ... return {"output": "Yes"} 

305 >>> @as_runnable 

306 ... def lc_predict(inputs): 

307 ... return nested_predict.invoke(inputs) 

308 >>> results = evaluate( 

309 ... lc_predict.invoke, 

310 ... data=dataset_name, 

311 ... evaluators=[accuracy], 

312 ... description="This time we're evaluating a LangChain object.", 

313 ... summary_evaluators=[precision], 

314 ... ) # doctest: +ELLIPSIS 

315 View the evaluation results for experiment:... 

316 

317 !!! warning "Behavior changed in `langsmith` 0.2.0" 

318 

319 'max_concurrency' default updated from None (no limit on concurrency) 

320 to 0 (no concurrency at all). 

321 """ # noqa: E501 

322 if isinstance(target, (str, uuid.UUID, schemas.TracerSession)): 

323 invalid_args = { 

324 "num_repetitions": num_repetitions > 1, 

325 "experiment": bool(experiment), 

326 "upload_results": not upload_results, 

327 "experiment_prefix": bool(experiment_prefix), 

328 "data": bool(data), 

329 } 

330 if any(invalid_args.values()): 

331 msg = ( 

332 f"Received invalid arguments. " 

333 f"{tuple(k for k, v in invalid_args.items() if v)} should not be " 

334 f"specified when target is an existing experiment." 

335 ) 

336 raise ValueError(msg) 

337 target_id = target if isinstance(target, (str, uuid.UUID)) else target.id 

338 logger.debug(f"Running evaluation over existing experiment {target_id}...") 

339 return evaluate_existing( 

340 target, 

341 evaluators=cast(Optional[Sequence[EVALUATOR_T]], evaluators), 

342 summary_evaluators=summary_evaluators, 

343 metadata=metadata, 

344 max_concurrency=max_concurrency, 

345 client=client, 

346 blocking=blocking, 

347 **kwargs, 

348 ) 

349 elif isinstance(target, (list, tuple)): 

350 invalid_args = { 

351 "num_repetitions": num_repetitions > 1, 

352 "experiment": bool(experiment), 

353 "upload_results": not upload_results, 

354 "summary_evaluators": bool(summary_evaluators), 

355 "data": bool(data), 

356 } 

357 if len(target) != 2 or not all( 

358 isinstance(t, (str, uuid.UUID, schemas.TracerSession)) for t in target 

359 ): 

360 msg = ( 

361 "Received invalid target. If a tuple is specified it must have length " 

362 "2 and each element should by the ID or schemas.TracerSession of an " 

363 f"existing experiment. Received {target=}" 

364 ) 

365 raise ValueError(msg) 

366 elif any(invalid_args.values()): 

367 msg = ( 

368 f"Received invalid arguments. " 

369 f"{tuple(k for k, v in invalid_args.items() if v)} should not be " 

370 f"specified when target is two existing experiments." 

371 ) 

372 raise ValueError(msg) 

373 if max_concurrency is not None: 

374 kwargs["max_concurrency"] = max_concurrency 

375 target_ids = [t if isinstance(t, (str, uuid.UUID)) else t.id for t in target] 

376 logger.debug( 

377 f"Running pairwise evaluation over existing experiments {target_ids}..." 

378 ) 

379 return evaluate_comparative( 

380 target, 

381 evaluators=cast(Sequence[COMPARATIVE_EVALUATOR_T], evaluators or ()), 

382 experiment_prefix=experiment_prefix, 

383 description=description, 

384 client=client, 

385 metadata=metadata, 

386 **kwargs, 

387 ) 

388 elif kwargs: 

389 msg = ( 

390 f"Received unsupported arguments {kwargs}. These arguments are not " 

391 f"supported when creating a new experiment." 

392 ) 

393 raise ValueError(msg) 

394 elif not data: 

395 msg = "Must specify 'data' when running evaluations over a target function." 

396 raise ValueError(msg) 

397 elif callable(target) and rh.is_async(target): 

398 msg = ( 

399 "Async functions are not supported by `evaluate`. " 

400 "Please use `aevaluate` instead:\n\n" 

401 "from langsmith import aevaluate\n\n" 

402 "await aevaluate(\n" 

403 " async_target_function,\n" 

404 " data=data,\n" 

405 " evaluators=evaluators,\n" 

406 " # ... other parameters\n" 

407 ")" 

408 ) 

409 raise ValueError(msg) 

410 elif experiment and experiment_prefix: 

411 msg = ( 

412 "Expected at most one of 'experiment' or 'experiment_prefix'," 

413 " but both were provided. " 

414 f"Got: experiment={experiment}, experiment_prefix={experiment_prefix}" 

415 ) 

416 raise ValueError(msg) 

417 else: 

418 if not upload_results: 

419 _warn_once("'upload_results' parameter is in beta.") 

420 logger.debug(f"Running evaluation over target system {target}...") 

421 return _evaluate( 

422 target, 

423 data=data, 

424 evaluators=cast(Optional[Sequence[EVALUATOR_T]], evaluators), 

425 summary_evaluators=summary_evaluators, 

426 metadata=metadata, 

427 experiment_prefix=experiment_prefix, 

428 description=description, 

429 max_concurrency=max_concurrency, 

430 num_repetitions=num_repetitions, 

431 client=client, 

432 blocking=blocking, 

433 experiment=experiment, 

434 upload_results=upload_results, 

435 error_handling=error_handling, 

436 ) 

437 

438 

439def evaluate_existing( 

440 experiment: Union[str, uuid.UUID, schemas.TracerSession], 

441 /, 

442 evaluators: Optional[Sequence[EVALUATOR_T]] = None, 

443 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None, 

444 metadata: Optional[dict] = None, 

445 max_concurrency: Optional[int] = 0, 

446 client: Optional[langsmith.Client] = None, 

447 load_nested: bool = False, 

448 blocking: bool = True, 

449) -> ExperimentResults: 

450 r"""Evaluate existing experiment runs. 

451 

452 Args: 

453 experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate. 

454 evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation. 

455 summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators 

456 to apply over the entire dataset. 

457 metadata (Optional[dict]): Optional metadata to include in the evaluation results. 

458 max_concurrency (int | None): The maximum number of concurrent 

459 evaluations to run. 

460 

461 If `None` then no limit is set. If `0` then no concurrency. 

462 client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation. 

463 load_nested: Whether to load all child runs for the experiment. 

464 

465 Default is to only load the top-level root runs. 

466 blocking (bool): Whether to block until evaluation is complete. 

467 

468 Returns: 

469 The evaluation results. 

470 

471 Environment: 

472 - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to save time and 

473 cost during testing. 

474 

475 Recommended to commit the cache files to your repository for faster CI/CD runs. 

476 

477 Requires the `'langsmith[vcr]'` package to be installed. 

478 

479 Examples: 

480 Define your evaluators 

481 

482 >>> from typing import Sequence 

483 >>> from langsmith.schemas import Example, Run 

484 >>> def accuracy(run: Run, example: Example): 

485 ... # Row-level evaluator for accuracy. 

486 ... pred = run.outputs["output"] 

487 ... expected = example.outputs["answer"] 

488 ... return {"score": expected.lower() == pred.lower()} 

489 >>> def precision(runs: Sequence[Run], examples: Sequence[Example]): 

490 ... # Experiment-level evaluator for precision. 

491 ... # TP / (TP + FP) 

492 ... predictions = [run.outputs["output"].lower() for run in runs] 

493 ... expected = [example.outputs["answer"].lower() for example in examples] 

494 ... # yes and no are the only possible answers 

495 ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) 

496 ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) 

497 ... return {"score": tp / (tp + fp)} 

498 

499 Load the experiment and run the evaluation. 

500 

501 >>> import uuid 

502 >>> from langsmith import Client 

503 >>> from langsmith.evaluation import evaluate, evaluate_existing 

504 >>> client = Client() 

505 >>> dataset_name = "__doctest_evaluate_existing_" + uuid.uuid4().hex[:8] 

506 >>> dataset = client.create_dataset(dataset_name) 

507 >>> example = client.create_example( 

508 ... inputs={"question": "What is 2+2?"}, 

509 ... outputs={"answer": "4"}, 

510 ... dataset_id=dataset.id, 

511 ... ) 

512 >>> def predict(inputs: dict) -> dict: 

513 ... return {"output": "4"} 

514 >>> # First run inference on the dataset 

515 ... results = evaluate( 

516 ... predict, data=dataset_name, experiment_prefix="doctest_experiment" 

517 ... ) # doctest: +ELLIPSIS 

518 View the evaluation results for experiment:... 

519 >>> experiment_id = results.experiment_name 

520 >>> # Wait for the experiment to be fully processed and check if we have results 

521 >>> len(results) > 0 

522 True 

523 >>> import time 

524 >>> time.sleep(2) 

525 >>> results = evaluate_existing( 

526 ... experiment_id, 

527 ... evaluators=[accuracy], 

528 ... summary_evaluators=[precision], 

529 ... ) # doctest: +ELLIPSIS 

530 View the evaluation results for experiment:... 

531 >>> client.delete_dataset(dataset_id=dataset.id) 

532 """ # noqa: E501 

533 client = client or rt.get_cached_client(timeout_ms=(20_000, 90_001)) 

534 project = _load_experiment(experiment, client) 

535 runs = _load_traces(experiment, client, load_nested=load_nested) 

536 data_map = _load_examples_map(client, project) 

537 data = [data_map[cast(uuid.UUID, run.reference_example_id)] for run in runs] 

538 return _evaluate( 

539 runs, 

540 data=data, 

541 evaluators=evaluators, 

542 summary_evaluators=summary_evaluators, 

543 metadata=metadata, 

544 max_concurrency=max_concurrency, 

545 client=client, 

546 blocking=blocking, 

547 experiment=project, 

548 ) 

549 

550 

551class ExperimentResultRow(TypedDict): 

552 run: schemas.Run 

553 example: schemas.Example 

554 evaluation_results: EvaluationResults 

555 

556 

557class ExperimentResults: 

558 """Represents the results of an evaluate() call. 

559 

560 This class provides an iterator interface to iterate over the experiment results 

561 as they become available. It also provides methods to access the experiment name, 

562 the number of results, and to wait for the results to be processed. 

563 

564 Methods: 

565 experiment_name() -> str: Returns the name of the experiment. 

566 wait() -> None: Waits for the experiment data to be processed. 

567 """ 

568 

569 def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True): 

570 self._manager = experiment_manager 

571 self._results: list[ExperimentResultRow] = [] 

572 self._queue: queue.Queue[ExperimentResultRow] = queue.Queue() 

573 self._processing_complete = threading.Event() 

574 if not blocking: 

575 self._thread: Optional[threading.Thread] = threading.Thread( 

576 target=self._process_data 

577 ) 

578 self._thread.start() 

579 else: 

580 self._thread = None 

581 self._process_data() 

582 

583 @property 

584 def experiment_name(self) -> str: 

585 return self._manager.experiment_name 

586 

587 def __iter__(self) -> Iterator[ExperimentResultRow]: 

588 ix = 0 

589 while ( 

590 not self._processing_complete.is_set() 

591 or not self._queue.empty() 

592 or ix < len(self._results) 

593 ): 

594 try: 

595 if ix < len(self._results): 

596 yield self._results[ix] 

597 ix += 1 

598 else: 

599 self._queue.get(block=True, timeout=0.1) 

600 except queue.Empty: 

601 continue 

602 

603 def _process_data(self) -> None: 

604 tqdm = _load_tqdm() 

605 results = self._manager.get_results() 

606 for item in tqdm(results): 

607 self._queue.put(item) 

608 self._results.append(item) 

609 

610 summary_scores = self._manager.get_summary_scores() 

611 self._summary_results = summary_scores 

612 

613 self._processing_complete.set() 

614 

615 def __len__(self) -> int: 

616 return len(self._results) 

617 

618 def to_pandas( 

619 self, start: Optional[int] = 0, end: Optional[int] = None 

620 ) -> DataFrame: 

621 return _to_pandas(self._results, start=start, end=end) 

622 

623 def _repr_html_(self) -> str: 

624 import importlib.util 

625 

626 if self._results and importlib.util.find_spec("pandas"): 

627 df = self.to_pandas() 

628 return df._repr_html_() # type: ignore[operator] 

629 else: 

630 return self.__repr__() 

631 

632 def __repr__(self) -> str: 

633 return f"<ExperimentResults {self.experiment_name}>" 

634 

635 def wait(self) -> None: 

636 """Wait for the evaluation runner to complete. 

637 

638 This method blocks the current thread until the evaluation runner has 

639 finished its execution. 

640 """ 

641 if self._thread: 

642 self._thread.join() 

643 

644 

645## Public API for Comparison Experiments 

646 

647# Row-level evaluator 

648COMPARATIVE_EVALUATOR_T = Callable[ 

649 [Sequence[schemas.Run], Optional[schemas.Example]], 

650 Union[ 

651 Union[ComparisonEvaluationResult, dict], 

652 Awaitable[Union[ComparisonEvaluationResult, dict]], 

653 ], 

654] 

655 

656 

657def evaluate_comparative( 

658 experiments: tuple[EXPERIMENT_T, EXPERIMENT_T], 

659 /, 

660 evaluators: Sequence[COMPARATIVE_EVALUATOR_T], 

661 experiment_prefix: Optional[str] = None, 

662 description: Optional[str] = None, 

663 max_concurrency: int = 5, 

664 client: Optional[langsmith.Client] = None, 

665 metadata: Optional[dict] = None, 

666 load_nested: bool = False, 

667 randomize_order: bool = False, 

668) -> ComparativeExperimentResults: 

669 r"""Evaluate existing experiment runs against each other. 

670 

671 This lets you use pairwise preference scoring to generate more 

672 reliable feedback in your experiments. 

673 

674 Args: 

675 experiments (Tuple[Union[str, uuid.UUID], Union[str, uuid.UUID]]): 

676 The identifiers of the experiments to compare. 

677 evaluators (Sequence[COMPARATIVE_EVALUATOR_T]): 

678 A list of evaluators to run on each example. 

679 experiment_prefix (Optional[str]): A prefix to provide for your experiment name. 

680 description (Optional[str]): A free-form text description for the experiment. 

681 max_concurrency (int): The maximum number of concurrent evaluations to run. 

682 client (Optional[langsmith.Client]): The LangSmith client to use. 

683 metadata (Optional[dict]): Metadata to attach to the experiment. 

684 load_nested (bool): Whether to load all child runs for the experiment. 

685 

686 Default is to only load the top-level root runs. 

687 randomize_order (bool): Whether to randomize the order of the outputs for each evaluation. 

688 

689 Returns: 

690 The results of the comparative evaluation. 

691 

692 Examples: 

693 Suppose you want to compare two prompts to see which one is more effective. 

694 You would first prepare your dataset: 

695 

696 >>> from typing import Sequence 

697 >>> from langsmith import Client 

698 >>> from langsmith.evaluation import evaluate 

699 >>> from langsmith.schemas import Example, Run 

700 >>> client = Client() 

701 >>> dataset = client.clone_public_dataset( 

702 ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d" 

703 ... ) 

704 >>> dataset_name = "Evaluate Examples" 

705 

706 Then you would run your different prompts: 

707 >>> import functools 

708 >>> import openai 

709 >>> from langsmith.evaluation import evaluate 

710 >>> from langsmith.wrappers import wrap_openai 

711 >>> oai_client = openai.Client() 

712 >>> wrapped_client = wrap_openai(oai_client) 

713 >>> prompt_1 = "You are a helpful assistant." 

714 >>> prompt_2 = "You are an exceedingly helpful assistant." 

715 >>> def predict(inputs: dict, prompt: str) -> dict: 

716 ... completion = wrapped_client.chat.completions.create( 

717 ... model="gpt-4o-mini", 

718 ... messages=[ 

719 ... {"role": "system", "content": prompt}, 

720 ... { 

721 ... "role": "user", 

722 ... "content": f"Context: {inputs['context']}" 

723 ... f"\n\ninputs['question']", 

724 ... }, 

725 ... ], 

726 ... ) 

727 ... return {"output": completion.choices[0].message.content} 

728 >>> results_1 = evaluate( 

729 ... functools.partial(predict, prompt=prompt_1), 

730 ... data=dataset_name, 

731 ... description="Evaluating our basic system prompt.", 

732 ... blocking=False, # Run these experiments in parallel 

733 ... ) # doctest: +ELLIPSIS 

734 View the evaluation results for experiment:... 

735 >>> results_2 = evaluate( 

736 ... functools.partial(predict, prompt=prompt_2), 

737 ... data=dataset_name, 

738 ... description="Evaluating our advanced system prompt.", 

739 ... blocking=False, 

740 ... ) # doctest: +ELLIPSIS 

741 View the evaluation results for experiment:... 

742 >>> results_1.wait() 

743 >>> results_2.wait() 

744 

745 Finally, you would compare the two prompts directly: 

746 >>> import json 

747 >>> from langsmith.evaluation import evaluate_comparative 

748 >>> from langsmith import schemas 

749 >>> def score_preferences(runs: list, example: schemas.Example): 

750 ... assert len(runs) == 2 # Comparing 2 systems 

751 ... assert isinstance(example, schemas.Example) 

752 ... assert all(run.reference_example_id == example.id for run in runs) 

753 ... pred_a = runs[0].outputs["output"] if runs[0].outputs else "" 

754 ... pred_b = runs[1].outputs["output"] if runs[1].outputs else "" 

755 ... ground_truth = example.outputs["answer"] if example.outputs else "" 

756 ... tools = [ 

757 ... { 

758 ... "type": "function", 

759 ... "function": { 

760 ... "name": "rank_preferences", 

761 ... "description": "Saves the prefered response ('A' or 'B')", 

762 ... "parameters": { 

763 ... "type": "object", 

764 ... "properties": { 

765 ... "reasoning": { 

766 ... "type": "string", 

767 ... "description": "The reasoning behind the choice.", 

768 ... }, 

769 ... "preferred_option": { 

770 ... "type": "string", 

771 ... "enum": ["A", "B"], 

772 ... "description": "The preferred option, either 'A' or 'B'", 

773 ... }, 

774 ... }, 

775 ... "required": ["preferred_option"], 

776 ... }, 

777 ... }, 

778 ... } 

779 ... ] 

780 ... completion = openai.Client().chat.completions.create( 

781 ... model="gpt-4o-mini", 

782 ... messages=[ 

783 ... {"role": "system", "content": "Select the better response."}, 

784 ... { 

785 ... "role": "user", 

786 ... "content": f"Option A: {pred_a}" 

787 ... f"\n\nOption B: {pred_b}" 

788 ... f"\n\nGround Truth: {ground_truth}", 

789 ... }, 

790 ... ], 

791 ... tools=tools, 

792 ... tool_choice={ 

793 ... "type": "function", 

794 ... "function": {"name": "rank_preferences"}, 

795 ... }, 

796 ... ) 

797 ... tool_args = completion.choices[0].message.tool_calls[0].function.arguments 

798 ... loaded_args = json.loads(tool_args) 

799 ... preference = loaded_args["preferred_option"] 

800 ... comment = loaded_args["reasoning"] 

801 ... if preference == "A": 

802 ... return { 

803 ... "key": "ranked_preference", 

804 ... "scores": {runs[0].id: 1, runs[1].id: 0}, 

805 ... "comment": comment, 

806 ... } 

807 ... else: 

808 ... return { 

809 ... "key": "ranked_preference", 

810 ... "scores": {runs[0].id: 0, runs[1].id: 1}, 

811 ... "comment": comment, 

812 ... } 

813 >>> def score_length_difference(runs: list, example: schemas.Example): 

814 ... # Just return whichever response is longer. 

815 ... # Just an example, not actually useful in real life. 

816 ... assert len(runs) == 2 # Comparing 2 systems 

817 ... assert isinstance(example, schemas.Example) 

818 ... assert all(run.reference_example_id == example.id for run in runs) 

819 ... pred_a = runs[0].outputs["output"] if runs[0].outputs else "" 

820 ... pred_b = runs[1].outputs["output"] if runs[1].outputs else "" 

821 ... if len(pred_a) > len(pred_b): 

822 ... return { 

823 ... "key": "length_difference", 

824 ... "scores": {runs[0].id: 1, runs[1].id: 0}, 

825 ... } 

826 ... else: 

827 ... return { 

828 ... "key": "length_difference", 

829 ... "scores": {runs[0].id: 0, runs[1].id: 1}, 

830 ... } 

831 >>> results = evaluate_comparative( 

832 ... [results_1.experiment_name, results_2.experiment_name], 

833 ... evaluators=[score_preferences, score_length_difference], 

834 ... client=client, 

835 ... ) # doctest: +ELLIPSIS 

836 View the pairwise evaluation results at:... 

837 >>> eval_results = list(results) 

838 >>> assert len(eval_results) >= 10 # doctest: +SKIP 

839 >>> assert all( 

840 ... "feedback.ranked_preference" in r["evaluation_results"] 

841 ... for r in eval_results 

842 ... ) # doctest: +SKIP 

843 >>> assert all( 

844 ... "feedback.length_difference" in r["evaluation_results"] 

845 ... for r in eval_results 

846 ... ) # doctest: +SKIP 

847 """ # noqa: E501 

848 if len(experiments) < 2: 

849 raise ValueError("Comparative evaluation requires at least 2 experiments.") 

850 if not evaluators: 

851 raise ValueError( 

852 "At least one evaluator is required for comparative evaluation." 

853 ) 

854 if max_concurrency < 0: 

855 raise ValueError("max_concurrency must be a positive integer.") 

856 client = client or rt.get_cached_client() 

857 

858 # TODO: Add information about comparison experiments 

859 projects = [_load_experiment(experiment, client) for experiment in experiments] 

860 ref_datasets_ = [str(p.reference_dataset_id) for p in projects] 

861 if not len(set(ref_datasets_)) == 1: 

862 raise ValueError("All experiments must have the same reference dataset.") 

863 experiment_ids = [p.id for p in projects] 

864 if experiment_prefix is None: 

865 experiment_names = [p.name for p in projects if p.name is not None] 

866 experiment_name = ( 

867 " vs. ".join(experiment_names) + "-" + str(uuid.uuid4().hex[:4]) 

868 ) 

869 else: 

870 experiment_name = experiment_prefix + "-" + str(uuid.uuid4().hex[:8]) 

871 comparative_experiment_id = uuid.uuid4() 

872 comparative_experiment = client.create_comparative_experiment( 

873 experiment_name, 

874 experiments=experiment_ids, 

875 description=description, 

876 metadata=metadata, 

877 id=comparative_experiment_id, 

878 ) 

879 _print_comparative_experiment_start( 

880 cast( 

881 tuple[schemas.TracerSessionResult, schemas.TracerSessionResult], 

882 tuple(projects), 

883 ), 

884 comparative_experiment, 

885 ) 

886 runs = [ 

887 _load_traces(experiment, client, load_nested=load_nested) 

888 for experiment in experiments 

889 ] 

890 # Only check intersections for the experiments 

891 examples_intersection = None 

892 for runs_list in runs: 

893 example_ids_set = {run.reference_example_id for run in runs_list} 

894 if examples_intersection is None: 

895 examples_intersection = example_ids_set 

896 else: 

897 examples_intersection &= example_ids_set 

898 example_ids_nullable = ( 

899 list(examples_intersection) if examples_intersection is not None else [] 

900 ) 

901 example_ids = [eid for eid in example_ids_nullable if eid is not None] 

902 # TODO: Warn if different dataset versions, etc. are used in the different 

903 # experiments. We aren't providing any training wheels here. 

904 batch_size = 99 

905 data = {} 

906 for i in range(0, len(example_ids), batch_size): 

907 example_ids_batch = example_ids[i : i + batch_size] 

908 for e in client.list_examples( 

909 dataset_id=projects[0].reference_dataset_id, 

910 as_of=projects[0].metadata.get("dataset_version"), 

911 example_ids=example_ids_batch, 

912 ): 

913 data[e.id] = e 

914 runs_dict: dict[uuid.UUID, list[schemas.Run]] = collections.defaultdict(list) 

915 for runs_list in runs: 

916 for run in runs_list: 

917 if run.reference_example_id in data: 

918 runs_dict[cast(uuid.UUID, run.reference_example_id)].append(run) 

919 

920 comparators = [comparison_evaluator(evaluator) for evaluator in evaluators or []] 

921 results: dict = {} 

922 

923 def evaluate_and_submit_feedback( 

924 runs_list: list[schemas.Run], 

925 example: schemas.Example, 

926 comparator: DynamicComparisonRunEvaluator, 

927 executor: cf.Executor, 

928 ) -> tuple[uuid.UUID, ComparisonEvaluationResult]: 

929 feedback_group_id = uuid.uuid4() 

930 if randomize_order: 

931 random.shuffle(runs_list) 

932 with rh.tracing_context(project_name="evaluators", client=client): 

933 result = comparator.compare_runs(runs_list, example) 

934 if client is None: 

935 raise ValueError("Client is required to submit feedback.") 

936 comments = ( 

937 {str(rid): result.comment for rid in result.scores} 

938 if isinstance(result.comment, str) 

939 else (result.comment or {}) 

940 ) 

941 for run_id, score in result.scores.items(): 

942 executor.submit( 

943 client.create_feedback, 

944 run_id=run_id, 

945 key=result.key, 

946 score=score, 

947 comment=comments.get(str(run_id)), 

948 comparative_experiment_id=comparative_experiment.id, 

949 source_run_id=result.source_run_id, 

950 feedback_group_id=feedback_group_id, 

951 ) 

952 return example.id, result 

953 

954 tqdm = _load_tqdm() 

955 with ls_utils.ContextThreadPoolExecutor( 

956 max_workers=max_concurrency or 1 

957 ) as executor: 

958 futures = [] 

959 for example_id, runs_list in tqdm(runs_dict.items()): 

960 results[example_id] = {"runs": runs_list} 

961 for comparator in comparators: 

962 if max_concurrency > 1: 

963 future = executor.submit( 

964 evaluate_and_submit_feedback, 

965 runs_list, 

966 data[example_id], 

967 comparator, 

968 executor, 

969 ) 

970 futures.append(future) 

971 else: 

972 _, result = evaluate_and_submit_feedback( 

973 runs_list, data[example_id], comparator, executor 

974 ) 

975 results[example_id][f"feedback.{result.key}"] = result 

976 if futures: 

977 cf.wait(futures) 

978 for future in futures: 

979 example_id, result = future.result() 

980 results[example_id][f"feedback.{result.key}"] = result 

981 

982 return ComparativeExperimentResults(results, data) 

983 

984 

985class ComparativeExperimentResults: 

986 """Represents the results of an evaluate_comparative() call. 

987 

988 This class provides an iterator interface to iterate over the experiment results 

989 as they become available. It also provides methods to access the experiment name, 

990 the number of results, and to wait for the results to be processed. 

991 

992 Methods: 

993 experiment_name() -> str: Returns the name of the experiment. 

994 wait() -> None: Waits for the experiment data to be processed. 

995 """ 

996 

997 def __init__( 

998 self, 

999 results: dict, 

1000 examples: Optional[dict[uuid.UUID, schemas.Example]] = None, 

1001 ): 

1002 self._results = results 

1003 self._examples = examples 

1004 

1005 def __getitem__(self, key): 

1006 """Return the result associated with the given key.""" 

1007 return self._results[key] 

1008 

1009 def __iter__(self): 

1010 for key, value in self._results.items(): 

1011 yield { 

1012 "example": self._examples[key] if self._examples else None, 

1013 "evaluation_results": value, 

1014 } 

1015 

1016 

1017## Private API 

1018 

1019 

1020def _print_comparative_experiment_start( 

1021 experiments: tuple[schemas.TracerSession, schemas.TracerSession], 

1022 comparative_experiment: schemas.ComparativeExperiment, 

1023) -> None: 

1024 url = experiments[0].url or experiments[1].url 

1025 if url: 

1026 project_url = url.split("?")[0] 

1027 dataset_id = comparative_experiment.reference_dataset_id 

1028 base_url = project_url.split("/projects/p/")[0] 

1029 comparison_url = ( 

1030 f"{base_url}/datasets/{dataset_id}/compare?" 

1031 f"selectedSessions={'%2C'.join([str(e.id) for e in experiments])}" 

1032 f"&comparativeExperiment={comparative_experiment.id}" 

1033 ) 

1034 print( # noqa: T201 

1035 f"View the pairwise evaluation results at:\n{comparison_url}\n\n" 

1036 ) 

1037 

1038 

1039def _is_callable(target: Union[TARGET_T, Iterable[schemas.Run], Runnable]) -> bool: 

1040 return callable(target) or _is_langchain_runnable(target) 

1041 

1042 

1043def _evaluate( 

1044 target: Union[TARGET_T, Iterable[schemas.Run], Runnable], 

1045 /, 

1046 data: DATA_T, 

1047 evaluators: Optional[Sequence[EVALUATOR_T]] = None, 

1048 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None, 

1049 metadata: Optional[dict] = None, 

1050 experiment_prefix: Optional[str] = None, 

1051 description: Optional[str] = None, 

1052 max_concurrency: Optional[int] = None, 

1053 num_repetitions: int = 1, 

1054 client: Optional[langsmith.Client] = None, 

1055 blocking: bool = True, 

1056 experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None, 

1057 upload_results: bool = True, 

1058 error_handling: Literal["log", "ignore"] = "log", 

1059) -> ExperimentResults: 

1060 # Initialize the experiment manager. 

1061 client = client or rt.get_cached_client() 

1062 runs = None if _is_callable(target) else cast(Iterable[schemas.Run], target) 

1063 experiment_, runs = _resolve_experiment(experiment, runs, client) 

1064 

1065 manager = _ExperimentManager( 

1066 data, 

1067 client=client, 

1068 metadata=metadata, 

1069 experiment=experiment_ or experiment_prefix, 

1070 description=description, 

1071 num_repetitions=num_repetitions, 

1072 # If provided, we don't need to create a new experiment. 

1073 runs=runs, 

1074 # Create or resolve the experiment. 

1075 include_attachments=_include_attachments(target, evaluators), 

1076 upload_results=upload_results, 

1077 error_handling=error_handling, 

1078 ).start() 

1079 if cache_dir := ls_utils.get_cache_dir(None): 

1080 cache_path = pathlib.Path(cache_dir) / f"{manager.dataset_id}.yaml" 

1081 else: 

1082 cache_path = None 

1083 with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]): 

1084 if _is_callable(target): 

1085 # Add predictions to the experiment. 

1086 manager = manager.with_predictions( 

1087 cast(TARGET_T, target), max_concurrency=max_concurrency 

1088 ) 

1089 if evaluators: 

1090 # Apply evaluators to the predictions. 

1091 manager = manager.with_evaluators( 

1092 evaluators, max_concurrency=max_concurrency 

1093 ) 

1094 if summary_evaluators: 

1095 # Apply the experiment-level summary evaluators. 

1096 manager = manager.with_summary_evaluators(summary_evaluators) 

1097 # Start consuming the results. 

1098 results = ExperimentResults(manager, blocking=blocking) 

1099 return results 

1100 

1101 

1102def _is_uuid(value: str) -> bool: 

1103 try: 

1104 uuid.UUID(value) 

1105 return True 

1106 except ValueError: 

1107 return False 

1108 

1109 

1110def _load_experiment( 

1111 project: EXPERIMENT_T, client: langsmith.Client 

1112) -> schemas.TracerSession: 

1113 if isinstance(project, schemas.TracerSession): 

1114 return project 

1115 elif isinstance(project, uuid.UUID) or _is_uuid(project): 

1116 return client.read_project(project_id=project) 

1117 else: 

1118 return client.read_project(project_name=project) 

1119 

1120 

1121def _load_traces( 

1122 project: Union[str, uuid.UUID, schemas.TracerSession], 

1123 client: langsmith.Client, 

1124 load_nested: bool = False, 

1125) -> list[schemas.Run]: 

1126 """Load nested traces for a given project.""" 

1127 is_root = None if load_nested else True 

1128 if isinstance(project, schemas.TracerSession): 

1129 runs = client.list_runs(project_id=project.id, is_root=is_root) 

1130 elif isinstance(project, uuid.UUID) or _is_uuid(project): 

1131 runs = client.list_runs(project_id=project, is_root=is_root) 

1132 else: 

1133 runs = client.list_runs(project_name=project, is_root=is_root) 

1134 if not load_nested: 

1135 return list(runs) 

1136 

1137 treemap: collections.defaultdict[uuid.UUID, list[schemas.Run]] = ( 

1138 collections.defaultdict(list) 

1139 ) 

1140 results = [] 

1141 all_runs = {} 

1142 for run in runs: 

1143 if run.parent_run_id is not None: 

1144 treemap[run.parent_run_id].append(run) 

1145 else: 

1146 results.append(run) 

1147 all_runs[run.id] = run 

1148 for run_id, child_runs in treemap.items(): 

1149 all_runs[run_id].child_runs = sorted(child_runs, key=lambda r: r.dotted_order) 

1150 return results 

1151 

1152 

1153def _load_examples_map( 

1154 client: langsmith.Client, project: schemas.TracerSession 

1155) -> dict[uuid.UUID, schemas.Example]: 

1156 return { 

1157 e.id: e 

1158 for e in client.list_examples( 

1159 dataset_id=project.reference_dataset_id, 

1160 as_of=project.metadata.get("dataset_version"), 

1161 ) 

1162 } 

1163 

1164 

1165IT = TypeVar("IT") 

1166 

1167 

1168def _load_tqdm() -> Callable[[IT], IT]: 

1169 try: 

1170 from tqdm.auto import tqdm 

1171 except ImportError: 

1172 return lambda x: x 

1173 return tqdm # type: ignore[return-value] 

1174 

1175 

1176ET = TypeVar("ET", bound="_ExperimentManagerMixin") 

1177 

1178 

1179class _ExperimentManagerMixin: 

1180 def __init__( 

1181 self, 

1182 /, 

1183 experiment: Optional[Union[schemas.TracerSession, str]], 

1184 metadata: Optional[dict] = None, 

1185 client: Optional[langsmith.Client] = None, 

1186 description: Optional[str] = None, 

1187 ): 

1188 self.client = client or rt.get_cached_client() 

1189 self._experiment: Optional[schemas.TracerSession] = None 

1190 if experiment is None: 

1191 self._experiment_name = _get_random_name() 

1192 elif isinstance(experiment, str): 

1193 self._experiment_name = experiment + "-" + str(uuid.uuid4().hex[:8]) 

1194 else: 

1195 self._experiment_name = cast(str, experiment.name) 

1196 self._experiment = experiment 

1197 

1198 metadata = metadata or {} 

1199 if not metadata.get("revision_id"): 

1200 metadata = { 

1201 "revision_id": ls_env.get_langchain_env_var_metadata().get( 

1202 "revision_id" 

1203 ), 

1204 **metadata, 

1205 } 

1206 self._metadata = metadata or {} 

1207 self._description = description 

1208 

1209 @property 

1210 def experiment_name(self) -> str: 

1211 if self._experiment_name is not None: 

1212 return self._experiment_name 

1213 raise ValueError( 

1214 "Experiment name not provided, and experiment not yet started." 

1215 ) 

1216 

1217 def _get_experiment(self) -> schemas.TracerSession: 

1218 if self._experiment is None: 

1219 raise ValueError("Experiment not started yet.") 

1220 return self._experiment 

1221 

1222 def _get_experiment_metadata(self): 

1223 project_metadata = self._metadata or {} 

1224 project_metadata["__ls_runner"] = "py_sdk_evaluate" 

1225 git_info = ls_env.get_git_info() 

1226 if git_info: 

1227 project_metadata = { 

1228 **project_metadata, 

1229 "git": git_info, 

1230 } 

1231 if self._experiment: 

1232 project_metadata = { 

1233 **self._experiment.metadata, 

1234 **project_metadata, 

1235 } 

1236 return project_metadata 

1237 

1238 def _create_experiment( 

1239 self, dataset_id: uuid.UUID, metadata: dict 

1240 ) -> schemas.TracerSession: 

1241 # There is a chance of name collision, so we'll retry 

1242 starting_name = self._experiment_name 

1243 num_attempts = 10 

1244 for _ in range(num_attempts): 

1245 try: 

1246 return self.client.create_project( 

1247 self._experiment_name, 

1248 description=self._description, 

1249 reference_dataset_id=dataset_id, 

1250 metadata=metadata, 

1251 ) 

1252 except ls_utils.LangSmithConflictError: 

1253 self._experiment_name = f"{starting_name}-{str(uuid.uuid4().hex[:6])}" 

1254 raise ValueError( 

1255 f"Could not find a unique experiment name in {num_attempts} attempts." 

1256 " Please try again with a different experiment name." 

1257 ) 

1258 

1259 def _get_project(self, first_example: schemas.Example) -> schemas.TracerSession: 

1260 if self._experiment is None: 

1261 project_metadata = self._get_experiment_metadata() 

1262 project = self._create_experiment( 

1263 first_example.dataset_id, project_metadata 

1264 ) 

1265 else: 

1266 project = self._experiment 

1267 return project 

1268 

1269 def _print_experiment_start( 

1270 self, project: Optional[schemas.TracerSession], first_example: schemas.Example 

1271 ) -> None: 

1272 if project and project.url: 

1273 # TODO: Make this a public API 

1274 project_url = project.url.split("?")[0] 

1275 dataset_id = first_example.dataset_id 

1276 base_url = project_url.split("/projects/p/")[0] 

1277 comparison_url = ( 

1278 f"{base_url}/datasets/{dataset_id}/compare?" 

1279 f"selectedSessions={project.id}" 

1280 ) 

1281 print( # noqa: T201 

1282 f"View the evaluation results for experiment: '{self.experiment_name}'" 

1283 f" at:\n{comparison_url}\n\n" 

1284 ) 

1285 else: 

1286 # HACKHACK 

1287 print( # noqa: T201 

1288 "Starting evaluation of experiment: %s", self.experiment_name 

1289 ) 

1290 

1291 

1292class _ExperimentManager(_ExperimentManagerMixin): 

1293 """Manage the execution of experiments. 

1294 

1295 Supports lazily running predictions and evaluations in parallel to facilitate 

1296 result streaming and early debugging. 

1297 

1298 Args: 

1299 data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR 

1300 a generator of examples. 

1301 num_repetitions (int): The number of times to run over the data. 

1302 runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment 

1303 predictions. 

1304 experiment (Optional[schemas.TracerSession]): The tracer session 

1305 associated with the experiment. 

1306 experiment_prefix (Optional[str]): The prefix for the experiment name. 

1307 metadata (Optional[dict]): Additional metadata for the experiment. 

1308 client (Optional[langsmith.Client]): The Langsmith client used for 

1309 the experiment. 

1310 evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation 

1311 sresults for the experiment. 

1312 summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results 

1313 for the experiment. 

1314 """ 

1315 

1316 def __init__( 

1317 self, 

1318 data: DATA_T, 

1319 /, 

1320 experiment: Optional[Union[schemas.TracerSession, str]], 

1321 metadata: Optional[dict] = None, 

1322 client: Optional[langsmith.Client] = None, 

1323 runs: Optional[Iterable[schemas.Run]] = None, 

1324 evaluation_results: Optional[Iterable[EvaluationResults]] = None, 

1325 summary_results: Optional[Iterable[EvaluationResults]] = None, 

1326 description: Optional[str] = None, 

1327 num_repetitions: int = 1, 

1328 include_attachments: bool = False, 

1329 reuse_attachments: bool = False, 

1330 upload_results: bool = True, 

1331 attachment_raw_data_dict: Optional[dict] = None, 

1332 error_handling: Literal["log", "ignore"] = "log", 

1333 ): 

1334 super().__init__( 

1335 experiment=experiment, 

1336 metadata=metadata, 

1337 client=client, 

1338 description=description, 

1339 ) 

1340 self._data = data 

1341 self._examples: Optional[Iterable[schemas.Example]] = None 

1342 self._runs = runs 

1343 self._evaluation_results = evaluation_results 

1344 self._summary_results = summary_results 

1345 self._num_repetitions = num_repetitions 

1346 self._include_attachments = include_attachments 

1347 self._reuse_attachments = reuse_attachments 

1348 self._upload_results = upload_results 

1349 self._attachment_raw_data_dict = attachment_raw_data_dict 

1350 self._error_handling = error_handling 

1351 

1352 def _reset_example_attachment_readers( 

1353 self, example: schemas.Example 

1354 ) -> schemas.Example: 

1355 """Reset attachment readers for an example. 

1356 

1357 This is only in the case that an attachment is going to be used by more 

1358 than 1 callable (target + evaluators). In that case we keep a single copy 

1359 of the attachment data in `self._attachment_raw_data_dict`, and create 

1360 readers from that data. This makes it so that we don't have to keep 

1361 copies of the same data in memory, instead we can just create readers 

1362 from the same data. 

1363 """ 

1364 if not hasattr(example, "attachments") or not example.attachments: 

1365 return example 

1366 

1367 new_attachments: dict[str, schemas.AttachmentInfo] = {} 

1368 for name, attachment in example.attachments.items(): 

1369 if ( 

1370 self._attachment_raw_data_dict is not None 

1371 and str(example.id) + name in self._attachment_raw_data_dict 

1372 ): 

1373 new_attachments[name] = { 

1374 "presigned_url": attachment["presigned_url"], 

1375 "reader": io.BytesIO( 

1376 self._attachment_raw_data_dict[str(example.id) + name] 

1377 ), 

1378 "mime_type": attachment["mime_type"], 

1379 } 

1380 else: 

1381 new_attachments[name] = attachment 

1382 

1383 # Create a new Example instance with the updated attachments 

1384 return schemas.Example( 

1385 id=example.id, 

1386 created_at=example.created_at, 

1387 dataset_id=example.dataset_id, 

1388 inputs=example.inputs, 

1389 outputs=example.outputs, 

1390 metadata=example.metadata, 

1391 modified_at=example.modified_at, 

1392 source_run_id=example.source_run_id, 

1393 attachments=new_attachments, 

1394 _host_url=example._host_url, 

1395 _tenant_id=example._tenant_id, 

1396 ) 

1397 

1398 @property 

1399 def examples(self) -> Iterable[schemas.Example]: 

1400 if self._examples is None: 

1401 self._examples = _resolve_data( 

1402 self._data, 

1403 client=self.client, 

1404 include_attachments=self._include_attachments, 

1405 ) 

1406 if self._reuse_attachments and self._attachment_raw_data_dict is None: 

1407 examples_copy, self._examples = itertools.tee(self._examples) 

1408 self._attachment_raw_data_dict = { 

1409 str(e.id) + name: value["reader"].read() 

1410 for e in examples_copy 

1411 for name, value in (e.attachments or {}).items() 

1412 } 

1413 if self._num_repetitions > 1: 

1414 examples_list = list(self._examples) 

1415 self._examples = itertools.chain.from_iterable( 

1416 [ 

1417 self._reset_example_attachment_readers(example) 

1418 for example in examples_list 

1419 ] 

1420 for _ in range(self._num_repetitions) 

1421 ) 

1422 self._examples, examples_iter = itertools.tee(self._examples) 

1423 return examples_iter 

1424 

1425 @property 

1426 def dataset_id(self) -> str: 

1427 if self._experiment is None or not getattr( 

1428 self._experiment, "reference_dataset_id", None 

1429 ): 

1430 example = next(iter(self.examples)) 

1431 return str(example.dataset_id) 

1432 return str( 

1433 cast(schemas.TracerSessionResult, self._experiment).reference_dataset_id 

1434 ) 

1435 

1436 @property 

1437 def evaluation_results(self) -> Iterable[EvaluationResults]: 

1438 if self._evaluation_results is None: 

1439 return ({"results": []} for _ in self.examples) 

1440 return self._evaluation_results 

1441 

1442 @property 

1443 def runs(self) -> Iterable[schemas.Run]: 

1444 if self._runs is None: 

1445 raise ValueError( 

1446 "Runs not provided in this experiment. Please predict first." 

1447 ) 

1448 self._runs, runs_iter = itertools.tee(self._runs) 

1449 return runs_iter 

1450 

1451 def start(self) -> _ExperimentManager: 

1452 first_example = next(itertools.islice(self.examples, 1)) 

1453 project = self._get_project(first_example) if self._upload_results else None 

1454 self._print_experiment_start(project, first_example) 

1455 self._metadata["num_repetitions"] = self._num_repetitions 

1456 return self._copy(self.examples, experiment=project) 

1457 

1458 def with_predictions( 

1459 self, 

1460 target: TARGET_T, 

1461 /, 

1462 max_concurrency: Optional[int] = None, 

1463 ) -> _ExperimentManager: 

1464 """Lazily apply the target function to the experiment.""" 

1465 context = copy_context() 

1466 _experiment_results = context.run( 

1467 self._predict, 

1468 target, 

1469 max_concurrency=max_concurrency, 

1470 include_attachments=_target_include_attachments(target), 

1471 ) 

1472 r1, r2 = itertools.tee(_experiment_results, 2) 

1473 return self._copy( 

1474 (pred["example"] for pred in r1), runs=(pred["run"] for pred in r2) 

1475 ) 

1476 

1477 def with_evaluators( 

1478 self, 

1479 evaluators: Sequence[ 

1480 Union[ 

1481 EVALUATOR_T, 

1482 RunEvaluator, 

1483 ] 

1484 ], 

1485 *, 

1486 max_concurrency: Optional[int] = None, 

1487 ) -> _ExperimentManager: 

1488 """Lazily apply the provided evaluators to the experiment.""" 

1489 evaluators = _resolve_evaluators(evaluators) 

1490 context = copy_context() 

1491 experiment_results = context.run( 

1492 self._score, evaluators, max_concurrency=max_concurrency 

1493 ) 

1494 # Split the generator into three so the manager 

1495 # can consume each value individually. 

1496 r1, r2, r3 = itertools.tee(experiment_results, 3) 

1497 return self._copy( 

1498 (result["example"] for result in r1), 

1499 runs=(result["run"] for result in r2), 

1500 evaluation_results=(result["evaluation_results"] for result in r3), 

1501 ) 

1502 

1503 def with_summary_evaluators( 

1504 self, 

1505 summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], 

1506 ) -> _ExperimentManager: 

1507 """Lazily apply the provided summary evaluators to the experiment.""" 

1508 wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators) 

1509 context = copy_context() 

1510 aggregate_feedback_gen = context.run( 

1511 self._apply_summary_evaluators, wrapped_evaluators 

1512 ) 

1513 return self._copy( 

1514 self.examples, runs=self.runs, summary_results=aggregate_feedback_gen 

1515 ) 

1516 

1517 def get_results(self) -> Iterable[ExperimentResultRow]: 

1518 """Return the traces, evaluation results, and associated examples.""" 

1519 for run, example, evaluation_results in zip( 

1520 self.runs, self.examples, self.evaluation_results 

1521 ): 

1522 yield ExperimentResultRow( 

1523 run=run, 

1524 example=example, 

1525 evaluation_results=evaluation_results, 

1526 ) 

1527 

1528 def get_summary_scores(self) -> dict[str, list[dict]]: 

1529 """If `summary_evaluators` were applied, consume and return the results.""" 

1530 if self._summary_results is None: 

1531 return {"results": []} 

1532 # Consume the generator 

1533 return { 

1534 "results": [ 

1535 res # type: ignore[misc] 

1536 for results in self._summary_results 

1537 for res in results["results"] 

1538 ] 

1539 } 

1540 

1541 # Private methods 

1542 

1543 def _predict( 

1544 self, 

1545 target: TARGET_T, 

1546 /, 

1547 max_concurrency: Optional[int] = None, 

1548 include_attachments: bool = False, 

1549 ) -> Generator[_ForwardResults, None, None]: 

1550 """Run the target function on the examples.""" 

1551 fn = _ensure_traceable(target) 

1552 

1553 if max_concurrency == 0: 

1554 for example in self.examples: 

1555 yield _forward( 

1556 fn, 

1557 example, 

1558 self.experiment_name, 

1559 self._metadata, 

1560 self.client, 

1561 self._upload_results, 

1562 include_attachments, 

1563 self._error_handling, 

1564 ) 

1565 

1566 else: 

1567 with ls_utils.ContextThreadPoolExecutor(max_concurrency) as executor: 

1568 futures = [ 

1569 executor.submit( 

1570 _forward, 

1571 fn, 

1572 example, 

1573 self.experiment_name, 

1574 self._metadata, 

1575 self.client, 

1576 self._upload_results, 

1577 include_attachments, 

1578 self._error_handling, 

1579 ) 

1580 for example in self.examples 

1581 ] 

1582 for future in cf.as_completed(futures): 

1583 yield future.result() 

1584 # Close out the project. 

1585 self._end() 

1586 

1587 def _run_evaluators( 

1588 self, 

1589 evaluators: Sequence[RunEvaluator], 

1590 current_results: ExperimentResultRow, 

1591 executor: cf.ThreadPoolExecutor, 

1592 ) -> ExperimentResultRow: 

1593 current_context = rh.get_tracing_context() 

1594 metadata = { 

1595 **(current_context["metadata"] or {}), 

1596 **{ 

1597 "experiment": self.experiment_name, 

1598 "reference_example_id": current_results["example"].id, 

1599 "reference_run_id": current_results["run"].id, 

1600 }, 

1601 } 

1602 with rh.tracing_context( 

1603 **{ 

1604 **current_context, 

1605 "project_name": "evaluators", 

1606 "metadata": metadata, 

1607 "enabled": "local" if not self._upload_results else True, 

1608 "client": self.client, 

1609 } 

1610 ): 

1611 run = current_results["run"] 

1612 example = current_results["example"] 

1613 eval_results = current_results["evaluation_results"] 

1614 for evaluator in evaluators: 

1615 evaluator_run_id = uuid.uuid4() 

1616 try: 

1617 evaluator_response = evaluator.evaluate_run( # type: ignore[call-arg] 

1618 run=run, 

1619 example=example, 

1620 evaluator_run_id=evaluator_run_id, 

1621 ) 

1622 

1623 eval_results["results"].extend( 

1624 self.client._select_eval_results(evaluator_response) 

1625 ) 

1626 if self._upload_results: 

1627 # TODO: This is a hack 

1628 self.client._log_evaluation_feedback( 

1629 evaluator_response, run=run, _executor=executor 

1630 ) 

1631 except Exception as e: 

1632 try: 

1633 feedback_keys = _extract_feedback_keys(evaluator) 

1634 

1635 error_response = EvaluationResults( 

1636 results=[ 

1637 EvaluationResult( 

1638 key=key, 

1639 source_run_id=evaluator_run_id, 

1640 comment=repr(e), 

1641 extra={"error": True}, 

1642 ) 

1643 for key in feedback_keys 

1644 ] 

1645 ) 

1646 eval_results["results"].extend( 

1647 self.client._select_eval_results(error_response) 

1648 ) 

1649 if self._upload_results: 

1650 # TODO: This is a hack 

1651 self.client._log_evaluation_feedback( 

1652 error_response, run=run, _executor=executor 

1653 ) 

1654 except Exception as e2: 

1655 logger.debug(f"Error parsing feedback keys: {e2}") 

1656 pass 

1657 logger.error( 

1658 f"Error running evaluator {repr(evaluator)} on" 

1659 f" run {run.id if run else ''}: {repr(e)}", 

1660 exc_info=True, 

1661 ) 

1662 if example.attachments is not None: 

1663 for attachment in example.attachments: 

1664 reader = example.attachments[attachment]["reader"] 

1665 reader.seek(0) 

1666 

1667 return ExperimentResultRow( 

1668 run=run, 

1669 example=example, 

1670 evaluation_results=eval_results, 

1671 ) 

1672 

1673 def _score( 

1674 self, 

1675 evaluators: Sequence[RunEvaluator], 

1676 max_concurrency: Optional[int] = None, 

1677 ) -> Iterable[ExperimentResultRow]: 

1678 """Run the evaluators on the prediction stream. 

1679 

1680 Expects runs to be available in the manager. 

1681 (e.g. from a previous prediction step) 

1682 """ 

1683 with ls_utils.ContextThreadPoolExecutor( 

1684 max_workers=max_concurrency or 1 

1685 ) as executor: 

1686 if max_concurrency == 0: 

1687 context = copy_context() 

1688 for current_results in self.get_results(): 

1689 yield context.run( 

1690 self._run_evaluators, 

1691 evaluators, 

1692 current_results, 

1693 executor, 

1694 ) 

1695 else: 

1696 futures = set() 

1697 for current_results in self.get_results(): 

1698 futures.add( 

1699 executor.submit( 

1700 self._run_evaluators, 

1701 evaluators, 

1702 current_results, 

1703 executor, 

1704 ) 

1705 ) 

1706 try: 

1707 # Since prediction may be slow, yield (with a timeout) to 

1708 # allow for early results to be emitted. 

1709 for future in cf.as_completed(futures, timeout=0.001): 

1710 yield future.result() 

1711 futures.remove(future) 

1712 except (cf.TimeoutError, TimeoutError): 

1713 pass 

1714 for future in cf.as_completed(futures): 

1715 result = future.result() 

1716 yield result 

1717 

1718 def _apply_summary_evaluators( 

1719 self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] 

1720 ) -> Generator[EvaluationResults, None, None]: 

1721 runs, examples = [], [] 

1722 for run, example in zip(self.runs, self.examples): 

1723 runs.append(run) 

1724 examples.append(example) 

1725 aggregate_feedback = [] 

1726 with ls_utils.ContextThreadPoolExecutor() as executor: 

1727 project_id = self._get_experiment().id if self._upload_results else None 

1728 current_context = rh.get_tracing_context() 

1729 metadata = { 

1730 **(current_context["metadata"] or {}), 

1731 **{ 

1732 "experiment": self.experiment_name, 

1733 "experiment_id": project_id, 

1734 }, 

1735 } 

1736 with rh.tracing_context( 

1737 **{ 

1738 **current_context, 

1739 "project_name": "evaluators", 

1740 "metadata": metadata, 

1741 "client": self.client, 

1742 "enabled": "local" if not self._upload_results else True, 

1743 } 

1744 ): 

1745 for evaluator in summary_evaluators: 

1746 try: 

1747 summary_eval_result = evaluator(runs, examples) 

1748 # TODO: Expose public API for this. 

1749 flattened_results = self.client._select_eval_results( 

1750 summary_eval_result, 

1751 fn_name=evaluator.__name__, 

1752 ) 

1753 aggregate_feedback.extend(flattened_results) 

1754 if self._upload_results: 

1755 for result in flattened_results: 

1756 feedback = result.dict(exclude={"target_run_id"}) 

1757 evaluator_info = feedback.pop("evaluator_info", None) 

1758 executor.submit( 

1759 self.client.create_feedback, 

1760 **feedback, 

1761 run_id=None, 

1762 project_id=project_id, 

1763 source_info=evaluator_info, 

1764 ) 

1765 except Exception as e: 

1766 logger.error( 

1767 f"Error running summary evaluator {repr(evaluator)}: {e}", 

1768 exc_info=True, 

1769 ) 

1770 yield {"results": aggregate_feedback} 

1771 

1772 def _get_dataset_version(self) -> Optional[str]: 

1773 examples = list(self.examples) 

1774 modified_at = [ex.modified_at for ex in examples if ex.modified_at] 

1775 # Should always be defined in practice when fetched, 

1776 # but the typing permits None 

1777 max_modified_at = max(modified_at) if modified_at else None 

1778 return max_modified_at.isoformat() if max_modified_at else None 

1779 

1780 def _get_dataset_splits(self) -> Optional[list[str]]: 

1781 examples = list(self.examples) 

1782 splits = set() 

1783 for example in examples: 

1784 if ( 

1785 example.metadata 

1786 and example.metadata.get("dataset_split") 

1787 and isinstance(example.metadata["dataset_split"], list) 

1788 ): 

1789 for split in example.metadata["dataset_split"]: 

1790 if isinstance(split, str): 

1791 splits.add(split) 

1792 else: 

1793 splits.add("base") 

1794 

1795 return list(splits) 

1796 

1797 def _end(self) -> None: 

1798 if not self._upload_results: 

1799 return 

1800 experiment = self._experiment 

1801 if experiment is None: 

1802 raise ValueError("Experiment not started yet.") 

1803 

1804 project_metadata = self._get_experiment_metadata() 

1805 project_metadata["dataset_version"] = self._get_dataset_version() 

1806 project_metadata["dataset_splits"] = self._get_dataset_splits() 

1807 self.client.update_project( 

1808 experiment.id, 

1809 metadata={ 

1810 **experiment.metadata, 

1811 **project_metadata, 

1812 }, 

1813 ) 

1814 

1815 def _copy(self, *args: Any, **kwargs: Any) -> _ExperimentManager: 

1816 default_args = (self._data,) 

1817 default_kwargs = { 

1818 "experiment": self._experiment, 

1819 "metadata": self._metadata, 

1820 "runs": self._runs, 

1821 "client": self.client, 

1822 "evaluation_results": self._evaluation_results, 

1823 "summary_results": self._summary_results, 

1824 "include_attachments": self._include_attachments, 

1825 "reuse_attachments": self._reuse_attachments, 

1826 "upload_results": self._upload_results, 

1827 "attachment_raw_data_dict": self._attachment_raw_data_dict, 

1828 "error_handling": self._error_handling, 

1829 } 

1830 full_args = list(args) + list(default_args[len(args) :]) 

1831 full_kwargs = {**default_kwargs, **kwargs} 

1832 return self.__class__(*full_args, **full_kwargs) 

1833 

1834 

1835def _resolve_evaluators( 

1836 evaluators: Sequence[Union[EVALUATOR_T, RunEvaluator, AEVALUATOR_T]], 

1837) -> Sequence[RunEvaluator]: 

1838 results = [] 

1839 for evaluator in evaluators: 

1840 if isinstance(evaluator, RunEvaluator): 

1841 results.append(evaluator) 

1842 elif isinstance(evaluator, LangChainStringEvaluator): 

1843 results.append(evaluator.as_run_evaluator()) 

1844 else: 

1845 results.append(run_evaluator(evaluator)) 

1846 return results 

1847 

1848 

1849def _wrap_summary_evaluators( 

1850 evaluators: Sequence[SUMMARY_EVALUATOR_T], 

1851) -> list[SUMMARY_EVALUATOR_T]: 

1852 def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T: 

1853 eval_name = getattr(evaluator, "__name__", "BatchEvaluator") 

1854 evaluator = _normalize_summary_evaluator(evaluator) 

1855 

1856 @functools.wraps(evaluator) 

1857 def _wrapper_inner( 

1858 runs: Sequence[schemas.Run], examples: Sequence[schemas.Example] 

1859 ) -> Union[EvaluationResult, EvaluationResults]: 

1860 @rh.traceable(name=eval_name) 

1861 def _wrapper_super_inner( 

1862 runs_: str, examples_: str 

1863 ) -> Union[EvaluationResult, EvaluationResults]: 

1864 return evaluator(list(runs), list(examples)) 

1865 

1866 return _wrapper_super_inner( 

1867 f"Runs[] (Length={len(runs)})", f"Examples[] (Length={len(examples)})" 

1868 ) 

1869 

1870 return _wrapper_inner 

1871 

1872 results = [] 

1873 for evaluator in evaluators: 

1874 results.append(_wrap(evaluator)) 

1875 return results 

1876 

1877 

1878class _ForwardResults(TypedDict): 

1879 run: schemas.Run 

1880 example: schemas.Example 

1881 

1882 

1883def _forward( 

1884 fn: rh.SupportsLangsmithExtra, 

1885 example: schemas.Example, 

1886 experiment_name: str, 

1887 metadata: dict, 

1888 client: langsmith.Client, 

1889 upload_results: bool, 

1890 include_attachments: bool = False, 

1891 error_handling: Literal["log", "ignore"] = "log", 

1892) -> _ForwardResults: 

1893 run: Optional[schemas.RunBase] = None 

1894 

1895 def _get_run(r: rt.RunTree) -> None: 

1896 nonlocal run 

1897 run = r 

1898 

1899 def _set_reference_example_id(r: rt.RunTree) -> None: 

1900 r.reference_example_id = example.id 

1901 

1902 example_version = (example.modified_at or example.created_at).isoformat() 

1903 langsmith_extra = rh.LangSmithExtra( 

1904 on_end=_get_run, 

1905 project_name=experiment_name, 

1906 metadata={**metadata, "example_version": example_version}, 

1907 client=client, 

1908 ) 

1909 if error_handling == "log": 

1910 langsmith_extra["reference_example_id"] = example.id 

1911 elif error_handling == "ignore": 

1912 # Only set the reference_example_id if the run succeeds. 

1913 langsmith_extra["_on_success"] = _set_reference_example_id 

1914 else: 

1915 raise ValueError(f"Unrecognized error_handling value: {error_handling=}") 

1916 

1917 with rh.tracing_context(enabled="local" if not upload_results else True): 

1918 try: 

1919 arg_names = _get_target_args(fn) 

1920 args = [getattr(example, argn) for argn in arg_names] 

1921 fn(*args, langsmith_extra=langsmith_extra) 

1922 # Reset attachment readers if attachments were used. 

1923 if include_attachments and example.attachments is not None: 

1924 for attachment in example.attachments: 

1925 reader = example.attachments[attachment]["reader"] 

1926 reader.seek(0) 

1927 except Exception as e: 

1928 logger.error( 

1929 f"Error running target function: {e}", exc_info=True, stacklevel=1 

1930 ) 

1931 return _ForwardResults(run=cast(schemas.Run, run), example=example) 

1932 

1933 

1934def _is_valid_uuid(value: str) -> bool: 

1935 try: 

1936 uuid.UUID(value) 

1937 return True 

1938 except ValueError: 

1939 return False 

1940 

1941 

1942def _resolve_data( 

1943 data: DATA_T, 

1944 *, 

1945 client: langsmith.Client, 

1946 include_attachments: bool = False, 

1947) -> Iterable[schemas.Example]: 

1948 """Return the examples for the given dataset.""" 

1949 if isinstance(data, uuid.UUID): 

1950 return client.list_examples( 

1951 dataset_id=data, include_attachments=include_attachments 

1952 ) 

1953 elif isinstance(data, str) and _is_valid_uuid(data): 

1954 return client.list_examples( 

1955 dataset_id=uuid.UUID(data), include_attachments=include_attachments 

1956 ) 

1957 elif isinstance(data, str): 

1958 return client.list_examples( 

1959 dataset_name=data, include_attachments=include_attachments 

1960 ) 

1961 elif isinstance(data, schemas.Dataset): 

1962 return client.list_examples( 

1963 dataset_id=data.id, include_attachments=include_attachments 

1964 ) 

1965 return data 

1966 

1967 

1968def _ensure_traceable( 

1969 target: TARGET_T | rh.SupportsLangsmithExtra[[dict], dict] | Runnable, 

1970) -> rh.SupportsLangsmithExtra[[dict], dict]: 

1971 """Ensure the target function is traceable.""" 

1972 if not _is_callable(target): 

1973 raise ValueError( 

1974 "Target must be a callable function or a langchain/langgraph object. For " 

1975 "example:\n\n" 

1976 "def predict(inputs: dict) -> dict:\n" 

1977 " # do work, like chain.invoke(inputs)\n" 

1978 " return {...}\n\n" 

1979 "evaluate(\n" 

1980 " predict,\n" 

1981 " ...\n" 

1982 ")" 

1983 ) 

1984 

1985 if rh.is_traceable_function(target): 

1986 fn: rh.SupportsLangsmithExtra[[dict], dict] = target 

1987 else: 

1988 if _is_langchain_runnable(target): 

1989 target = target.invoke # type: ignore[union-attr] 

1990 fn = rh.traceable(name="Target")(cast(Callable, target)) 

1991 return fn 

1992 

1993 

1994def _include_attachments(target: Any, evaluators: Optional[Sequence]) -> bool: 

1995 return _target_include_attachments(target) or bool( 

1996 _evaluators_include_attachments(evaluators) 

1997 ) 

1998 

1999 

2000def _evaluators_include_attachments(evaluators: Optional[Sequence]) -> int: 

2001 if evaluators is None: 

2002 return 0 

2003 

2004 return sum(_evaluator_uses_attachments(e) for e in evaluators) 

2005 

2006 

2007def _evaluator_uses_attachments(evaluator: Any) -> bool: 

2008 if not callable(evaluator): 

2009 return False 

2010 sig = inspect.signature(evaluator) 

2011 params = list(sig.parameters.values()) 

2012 positional_params = [ 

2013 p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) 

2014 ] 

2015 return any(p.name == "attachments" for p in positional_params) 

2016 

2017 

2018def _target_include_attachments(target: Any) -> bool: 

2019 """Whether the target function accepts attachments.""" 

2020 return "attachments" in _get_target_args(target) 

2021 

2022 

2023def _get_target_args(target: Any) -> list[str]: 

2024 """Whether the target function accepts attachments.""" 

2025 if not callable(target): 

2026 return [] 

2027 if _is_langchain_runnable(target): 

2028 return ["inputs"] 

2029 # Check function signature 

2030 sig = inspect.signature(target) 

2031 params = list(sig.parameters.values()) 

2032 positional_params = [ 

2033 p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) 

2034 ] 

2035 positional_no_default = [p for p in positional_params if p.default is p.empty] 

2036 

2037 if len(positional_params) == 0: 

2038 raise ValueError( 

2039 "Target function must accept at least one positional argument (inputs)." 

2040 ) 

2041 elif len(positional_no_default) > 3: 

2042 raise ValueError( 

2043 "Target function must accept at most three " 

2044 "arguments without default values: (inputs, attachments, metadata)." 

2045 ) 

2046 elif len(positional_no_default) > 1 and { 

2047 p.name for p in positional_no_default 

2048 }.difference(["inputs", "attachments", "metadata"]): 

2049 raise ValueError( 

2050 "When passing multiple positional arguments without default values, they " 

2051 "must be named 'inputs', 'attachments', or 'metadata'. Received: " 

2052 f"{[p.name for p in positional_no_default]}" 

2053 ) 

2054 else: 

2055 args = [] 

2056 for p in positional_params[:3]: 

2057 if p.name in {"inputs", "attachments", "metadata"}: 

2058 args.append(p.name) 

2059 else: 

2060 break 

2061 return args or ["inputs"] 

2062 

2063 

2064def _resolve_experiment( 

2065 experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]], 

2066 runs: Optional[Iterable[schemas.Run]], 

2067 client: langsmith.Client, 

2068) -> tuple[ 

2069 Optional[Union[schemas.TracerSession, str]], Optional[Iterable[schemas.Run]] 

2070]: 

2071 # TODO: Remove this, handle outside the manager 

2072 if experiment is not None: 

2073 if isinstance(experiment, schemas.TracerSession): 

2074 experiment_ = experiment 

2075 else: 

2076 experiment_ = _load_experiment(experiment, client) 

2077 

2078 if not experiment_.name: 

2079 raise ValueError("Experiment name must be defined if provided.") 

2080 if not experiment_.reference_dataset_id: 

2081 raise ValueError( 

2082 "Experiment must have an associated reference_dataset_id, " 

2083 "but none was provided." 

2084 ) 

2085 return experiment_, runs 

2086 # If we have runs, that means the experiment was already started. 

2087 if runs is not None: 

2088 runs_, runs = itertools.tee(runs) 

2089 first_run = next(runs_) 

2090 experiment_ = client.read_project(project_id=first_run.session_id) 

2091 if not experiment_.name: 

2092 raise ValueError("Experiment name not found for provided runs.") 

2093 return experiment_, runs 

2094 return None, None 

2095 

2096 

2097def _get_random_name() -> str: 

2098 from langsmith.evaluation._name_generation import random_name # noqa: F401 

2099 

2100 return random_name() 

2101 

2102 

2103def _extract_feedback_keys(evaluator: RunEvaluator): 

2104 if isinstance(evaluator, DynamicRunEvaluator): 

2105 if getattr(evaluator, "func", None): 

2106 return _extract_code_evaluator_feedback_keys(evaluator.func) 

2107 elif getattr(evaluator, "afunc", None): 

2108 return _extract_code_evaluator_feedback_keys(evaluator.afunc) 

2109 # TODO: Support for DynamicComparisonRunEvaluator 

2110 if hasattr(evaluator, "evaluator"): 

2111 # LangChainStringEvaluator 

2112 if getattr(getattr(evaluator, "evaluator"), "evaluation_name", None): 

2113 return [evaluator.evaluator.evaluation_name] 

2114 return [] 

2115 

2116 

2117def _extract_code_evaluator_feedback_keys(func: Callable) -> list[str]: 

2118 python_code = inspect.getsource(func) 

2119 

2120 def extract_dict_keys(node): 

2121 if isinstance(node, ast.Dict): 

2122 keys = [] 

2123 key_value = None 

2124 for key, value in zip(node.keys, node.values): 

2125 if isinstance(key, (ast.Str, ast.Constant)): 

2126 key_str = key.s if isinstance(key, ast.Str) else key.value 

2127 if key_str == "key" and isinstance(value, (ast.Str, ast.Constant)): 

2128 key_value = ( 

2129 value.s if isinstance(value, ast.Str) else value.value 

2130 ) 

2131 return [key_value] if key_value else keys 

2132 elif ( 

2133 isinstance(node, ast.Call) 

2134 and isinstance(node.func, ast.Name) 

2135 and node.func.id == "dict" 

2136 ): 

2137 for keyword in node.keywords: 

2138 if keyword.arg == "key" and isinstance( 

2139 keyword.value, (ast.Str, ast.Constant) 

2140 ): 

2141 return [ 

2142 ( 

2143 keyword.value.s 

2144 if isinstance(keyword.value, ast.Str) 

2145 else keyword.value.value 

2146 ) 

2147 ] 

2148 return [] 

2149 

2150 def extract_evaluation_result_key(node): 

2151 if ( 

2152 isinstance(node, ast.Call) 

2153 and isinstance(node.func, ast.Name) 

2154 and node.func.id == "EvaluationResult" 

2155 ): 

2156 for keyword in node.keywords: 

2157 if keyword.arg == "key" and isinstance( 

2158 keyword.value, (ast.Str, ast.Constant) 

2159 ): 

2160 return [ 

2161 ( 

2162 keyword.value.s 

2163 if isinstance(keyword.value, ast.Str) 

2164 else keyword.value.value 

2165 ) 

2166 ] 

2167 return [] 

2168 

2169 def extract_evaluation_results_keys(node, variables): 

2170 if ( 

2171 isinstance(node, ast.Call) 

2172 and isinstance(node.func, ast.Name) 

2173 and node.func.id == "EvaluationResults" 

2174 ): 

2175 for keyword in node.keywords: 

2176 if keyword.arg == "results": 

2177 if isinstance(keyword.value, ast.Name): 

2178 return variables.get(keyword.value.id, []) 

2179 elif isinstance(keyword.value, ast.List): 

2180 keys = [] 

2181 for elt in keyword.value.elts: 

2182 keys.extend(extract_evaluation_result_key(elt)) 

2183 return keys 

2184 elif isinstance(node, ast.Dict): 

2185 for key, value in zip(node.keys, node.values): 

2186 if isinstance(key, (ast.Str, ast.Constant)) and key.s == "results": 

2187 if isinstance(value, ast.List): 

2188 keys = [] 

2189 for elt in value.elts: 

2190 if isinstance(elt, ast.Dict): 

2191 for elt_key, elt_value in zip(elt.keys, elt.values): 

2192 if ( 

2193 isinstance(elt_key, (ast.Str, ast.Constant)) 

2194 and elt_key.s == "key" 

2195 ): 

2196 if isinstance( 

2197 elt_value, (ast.Str, ast.Constant) 

2198 ): 

2199 keys.append(elt_value.s) 

2200 elif ( 

2201 isinstance(elt, ast.Call) 

2202 and isinstance(elt.func, ast.Name) 

2203 and elt.func.id in ("EvaluationResult", "dict") 

2204 ): 

2205 for keyword in elt.keywords: 

2206 if keyword.arg == "key" and isinstance( 

2207 keyword.value, (ast.Str, ast.Constant) 

2208 ): 

2209 keys.append( 

2210 keyword.value.s 

2211 if isinstance(keyword.value, ast.Str) 

2212 else keyword.value.value 

2213 ) 

2214 

2215 return keys 

2216 return [] 

2217 

2218 python_code = textwrap.dedent(python_code) 

2219 

2220 try: 

2221 tree = ast.parse(python_code) 

2222 function_def = tree.body[0] 

2223 if not isinstance(function_def, (ast.FunctionDef, ast.AsyncFunctionDef)): 

2224 return [] 

2225 

2226 variables = {} 

2227 keys = [] 

2228 

2229 for node in ast.walk(function_def): 

2230 if isinstance(node, ast.Assign): 

2231 if isinstance(node.value, ast.List): 

2232 list_keys = [] 

2233 for elt in node.value.elts: 

2234 list_keys.extend(extract_evaluation_result_key(elt)) 

2235 if isinstance(node.targets[0], ast.Name): 

2236 variables[node.targets[0].id] = list_keys 

2237 elif isinstance(node, ast.Return) and node.value is not None: 

2238 dict_keys = extract_dict_keys(node.value) 

2239 eval_result_key = extract_evaluation_result_key(node.value) 

2240 eval_results_keys = extract_evaluation_results_keys( 

2241 node.value, variables 

2242 ) 

2243 

2244 keys.extend(dict_keys) 

2245 keys.extend(eval_result_key) 

2246 keys.extend(eval_results_keys) 

2247 

2248 # If no keys found, return the function name 

2249 return keys if keys else [function_def.name] 

2250 

2251 except SyntaxError: 

2252 return [] 

2253 

2254 

2255def _to_pandas( 

2256 results: list[ExperimentResultRow], 

2257 start: Optional[int] = 0, 

2258 end: Optional[int] = None, 

2259): 

2260 try: 

2261 import pandas as pd 

2262 except ImportError as e: 

2263 raise ImportError( 

2264 "The 'pandas' library is required to use the 'to_pandas' function. " 

2265 "Please install it using 'pip install pandas' or " 

2266 "'conda install pandas' before calling this method." 

2267 ) from e 

2268 

2269 return pd.DataFrame(_flatten_experiment_results(results, start=start, end=end)) 

2270 

2271 

2272def _flatten_experiment_results( 

2273 results: list[ExperimentResultRow], 

2274 start: Optional[int] = 0, 

2275 end: Optional[int] = None, 

2276): 

2277 return [ 

2278 { 

2279 **{f"inputs.{k}": v for k, v in (x["example"].inputs or {}).items()}, 

2280 **{f"outputs.{k}": v for k, v in (x["run"].outputs or {}).items()}, 

2281 "error": x["run"].error, 

2282 **( 

2283 {f"reference.{k}": v for k, v in x["example"].outputs.items()} 

2284 if x["example"].outputs is not None 

2285 else {} 

2286 ), 

2287 **{ 

2288 f"feedback.{r.key}": r.score if r.score is not None else r.value 

2289 for r in x["evaluation_results"]["results"] 

2290 }, 

2291 "execution_time": ( 

2292 (x["run"].end_time - x["run"].start_time).total_seconds() 

2293 if x["run"].end_time 

2294 else None 

2295 ), 

2296 "example_id": x["run"].reference_example_id, 

2297 "id": x["run"].id, 

2298 } 

2299 for x in results[start:end] 

2300 ] 

2301 

2302 

2303@functools.lru_cache(maxsize=1) 

2304def _import_langchain_runnable() -> Optional[type]: 

2305 try: 

2306 from langchain_core.runnables import Runnable 

2307 

2308 return Runnable 

2309 except ImportError: 

2310 return None 

2311 

2312 

2313def _is_langchain_runnable(o: Any) -> bool: 

2314 return bool((Runnable := _import_langchain_runnable()) and isinstance(o, Runnable))