Coverage for langsmith/evaluation/_arunner.py: 17%

384 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""V2 Evaluation Interface.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import concurrent.futures as cf 

7import io 

8import logging 

9import pathlib 

10import uuid 

11from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Iterable, Sequence 

12from typing import ( 

13 TYPE_CHECKING, 

14 Any, 

15 Callable, 

16 Literal, 

17 Optional, 

18 TypeVar, 

19 Union, 

20 cast, 

21) 

22 

23import langsmith 

24from langsmith import run_helpers as rh 

25from langsmith import run_trees, schemas 

26from langsmith import run_trees as rt 

27from langsmith import utils as ls_utils 

28from langsmith._internal import _aiter as aitertools 

29from langsmith._internal._beta_decorator import _warn_once 

30from langsmith.evaluation._runner import ( 

31 AEVALUATOR_T, 

32 DATA_T, 

33 EVALUATOR_T, 

34 ExperimentResultRow, 

35 _evaluators_include_attachments, 

36 _ExperimentManagerMixin, 

37 _extract_feedback_keys, 

38 _ForwardResults, 

39 _get_target_args, 

40 _is_langchain_runnable, 

41 _load_examples_map, 

42 _load_experiment, 

43 _load_tqdm, 

44 _load_traces, 

45 _resolve_data, 

46 _resolve_evaluators, 

47 _resolve_experiment, 

48 _target_include_attachments, 

49 _to_pandas, 

50 _wrap_summary_evaluators, 

51) 

52from langsmith.evaluation.evaluator import ( 

53 SUMMARY_EVALUATOR_T, 

54 EvaluationResult, 

55 EvaluationResults, 

56 RunEvaluator, 

57) 

58 

59if TYPE_CHECKING: 

60 import pandas as pd 

61 from langchain_core.runnables import Runnable 

62 

63 DataFrame = pd.DataFrame 

64else: 

65 DataFrame = Any 

66 

67logger = logging.getLogger(__name__) 

68 

69ATARGET_T = Union[ 

70 Callable[[dict], Awaitable[dict]], Callable[[dict, dict], Awaitable[dict]] 

71] 

72 

73 

74async def aevaluate( 

75 target: Union[ 

76 ATARGET_T, AsyncIterable[dict], Runnable, str, uuid.UUID, schemas.TracerSession 

77 ], 

78 /, 

79 data: Union[ 

80 DATA_T, AsyncIterable[schemas.Example], Iterable[schemas.Example], None 

81 ] = None, 

82 evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None, 

83 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None, 

84 metadata: Optional[dict] = None, 

85 experiment_prefix: Optional[str] = None, 

86 description: Optional[str] = None, 

87 max_concurrency: Optional[int] = 0, 

88 num_repetitions: int = 1, 

89 client: Optional[langsmith.Client] = None, 

90 blocking: bool = True, 

91 experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None, 

92 upload_results: bool = True, 

93 error_handling: Literal["log", "ignore"] = "log", 

94 **kwargs: Any, 

95) -> AsyncExperimentResults: 

96 r"""Evaluate an async target system on a given dataset. 

97 

98 Args: 

99 target (AsyncCallable[[dict], dict] | AsyncIterable[dict] | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]): 

100 The target system or experiment(s) to evaluate. 

101 

102 Can be an async function that takes a `dict` and returns a `dict`, a 

103 langchain `Runnable`, an existing experiment ID, or a two-tuple of experiment IDs. 

104 data (Union[DATA_T, AsyncIterable[schemas.Example]]): The dataset to evaluate on. 

105 

106 Can be a dataset name, a list of examples, an async generator of examples, or an async iterable of examples. 

107 evaluators (Optional[Sequence[EVALUATOR_T]]): A list of evaluators to run 

108 on each example. 

109 summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): A list of summary 

110 evaluators to run on the entire dataset. 

111 metadata (Optional[dict]): Metadata to attach to the experiment. 

112 experiment_prefix (Optional[str]): A prefix to provide for your experiment name. 

113 description (Optional[str]): A description of the experiment. 

114 max_concurrency (int | None): The maximum number of concurrent 

115 evaluations to run. 

116 

117 If `None` then no limit is set. If `0` then no concurrency. 

118 num_repetitions (int): The number of times to run the evaluation. 

119 Each item in the dataset will be run and evaluated this many times. 

120 client (Optional[langsmith.Client]): The LangSmith client to use. 

121 blocking (bool): Whether to block until the evaluation is complete. 

122 experiment (Optional[schemas.TracerSession]): An existing experiment to 

123 extend. 

124 

125 If provided, `experiment_prefix` is ignored. For advanced usage only. 

126 error_handling (str, default="log"): How to handle individual run errors. 

127 

128 `'log'` will trace the runs with the error message as part of the 

129 experiment, `'ignore'` will not count the run as part of the experiment at 

130 all. 

131 

132 Returns: 

133 An async iterator over the experiment results. 

134 

135 Environment: 

136 - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to save time and 

137 cost during testing. 

138 

139 Recommended to commit the cache files to your repository for faster CI/CD runs. 

140 

141 Requires the `'langsmith[vcr]'` package to be installed. 

142 

143 Examples: 

144 >>> from typing import Sequence 

145 >>> from langsmith import Client, aevaluate 

146 >>> from langsmith.schemas import Example, Run 

147 >>> client = Client() 

148 >>> dataset = client.clone_public_dataset( 

149 ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d" 

150 ... ) 

151 >>> dataset_name = "Evaluate Examples" 

152 

153 Basic usage: 

154 

155 >>> def accuracy(run: Run, example: Example): 

156 ... # Row-level evaluator for accuracy. 

157 ... pred = run.outputs["output"] 

158 ... expected = example.outputs["answer"] 

159 ... return {"score": expected.lower() == pred.lower()} 

160 

161 >>> def precision(runs: Sequence[Run], examples: Sequence[Example]): 

162 ... # Experiment-level evaluator for precision. 

163 ... # TP / (TP + FP) 

164 ... predictions = [run.outputs["output"].lower() for run in runs] 

165 ... expected = [example.outputs["answer"].lower() for example in examples] 

166 ... # yes and no are the only possible answers 

167 ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) 

168 ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) 

169 ... return {"score": tp / (tp + fp)} 

170 

171 >>> import asyncio 

172 >>> async def apredict(inputs: dict) -> dict: 

173 ... # This can be any async function or just an API call to your app. 

174 ... await asyncio.sleep(0.1) 

175 ... return {"output": "Yes"} 

176 >>> results = asyncio.run( 

177 ... aevaluate( 

178 ... apredict, 

179 ... data=dataset_name, 

180 ... evaluators=[accuracy], 

181 ... summary_evaluators=[precision], 

182 ... experiment_prefix="My Experiment", 

183 ... description="Evaluate the accuracy of the model asynchronously.", 

184 ... metadata={ 

185 ... "my-prompt-version": "abcd-1234", 

186 ... }, 

187 ... ) 

188 ... ) # doctest: +ELLIPSIS 

189 View the evaluation results for experiment:... 

190 

191 Evaluating over only a subset of the examples using an async generator: 

192 

193 >>> async def example_generator(): 

194 ... examples = client.list_examples(dataset_name=dataset_name, limit=5) 

195 ... for example in examples: 

196 ... yield example 

197 >>> results = asyncio.run( 

198 ... aevaluate( 

199 ... apredict, 

200 ... data=example_generator(), 

201 ... evaluators=[accuracy], 

202 ... summary_evaluators=[precision], 

203 ... experiment_prefix="My Subset Experiment", 

204 ... description="Evaluate a subset of examples asynchronously.", 

205 ... ) 

206 ... ) # doctest: +ELLIPSIS 

207 View the evaluation results for experiment:... 

208 

209 Streaming each prediction to more easily + eagerly debug. 

210 

211 >>> results = asyncio.run( 

212 ... aevaluate( 

213 ... apredict, 

214 ... data=dataset_name, 

215 ... evaluators=[accuracy], 

216 ... summary_evaluators=[precision], 

217 ... experiment_prefix="My Streaming Experiment", 

218 ... description="Streaming predictions for debugging.", 

219 ... blocking=False, 

220 ... ) 

221 ... ) # doctest: +ELLIPSIS 

222 View the evaluation results for experiment:... 

223 

224 >>> async def aenumerate(iterable): 

225 ... async for elem in iterable: 

226 ... print(elem) 

227 >>> asyncio.run(aenumerate(results)) 

228 

229 Running without concurrency: 

230 

231 >>> results = asyncio.run( 

232 ... aevaluate( 

233 ... apredict, 

234 ... data=dataset_name, 

235 ... evaluators=[accuracy], 

236 ... summary_evaluators=[precision], 

237 ... experiment_prefix="My Experiment Without Concurrency", 

238 ... description="This was run without concurrency.", 

239 ... max_concurrency=0, 

240 ... ) 

241 ... ) # doctest: +ELLIPSIS 

242 View the evaluation results for experiment:... 

243 

244 Using Async evaluators: 

245 

246 >>> async def helpfulness(run: Run, example: Example): 

247 ... # Row-level evaluator for helpfulness. 

248 ... await asyncio.sleep(5) # Replace with your LLM API call 

249 ... return {"score": run.outputs["output"] == "Yes"} 

250 

251 >>> results = asyncio.run( 

252 ... aevaluate( 

253 ... apredict, 

254 ... data=dataset_name, 

255 ... evaluators=[helpfulness], 

256 ... summary_evaluators=[precision], 

257 ... experiment_prefix="My Helpful Experiment", 

258 ... description="Applying async evaluators example.", 

259 ... ) 

260 ... ) # doctest: +ELLIPSIS 

261 View the evaluation results for experiment:... 

262 

263 

264 !!! warning "Behavior changed in `langsmith` 0.2.0" 

265 

266 'max_concurrency' default updated from None (no limit on concurrency) 

267 to 0 (no concurrency at all). 

268 """ # noqa: E501 

269 if isinstance(target, (str, uuid.UUID, schemas.TracerSession)): 

270 invalid_args = { 

271 "num_repetitions": num_repetitions > 1, 

272 "experiment": bool(experiment), 

273 "upload_results": not upload_results, 

274 "experiment_prefix": bool(experiment_prefix), 

275 "data": bool(data), 

276 } 

277 if any(invalid_args.values()): 

278 msg = ( 

279 f"Received invalid arguments. " 

280 f"{tuple(k for k, v in invalid_args.items() if v)} should not be " 

281 f"specified when target is an existing experiment." 

282 ) 

283 raise ValueError(msg) 

284 target_id = target if isinstance(target, (str, uuid.UUID)) else target.id 

285 logger.debug(f"Running evaluation over existing experiment {target_id}...") 

286 return await aevaluate_existing( 

287 target, 

288 evaluators=evaluators, 

289 summary_evaluators=summary_evaluators, 

290 metadata=metadata, 

291 max_concurrency=max_concurrency, 

292 client=client, 

293 blocking=blocking, 

294 **kwargs, 

295 ) 

296 elif isinstance(target, (list, tuple)): 

297 msg = ( 

298 "Running a comparison of two existing experiments asynchronously is not " 

299 "currently supported. Please use the `evaluate()` method instead and make " 

300 "sure that your evaluators are defined as synchronous functions." 

301 ) 

302 raise ValueError(msg) 

303 elif kwargs: 

304 msg = ( 

305 f"Received unsupported arguments {kwargs}. These arguments are not " 

306 f"supported when creating a new experiment." 

307 ) 

308 raise ValueError(msg) 

309 elif not data: 

310 msg = "Must specify 'data' when running evaluations over a target function." 

311 raise ValueError(msg) 

312 elif experiment and experiment_prefix: 

313 msg = ( 

314 "Expected at most one of 'experiment' or 'experiment_prefix'," 

315 " but both were provided. " 

316 f"Got: experiment={experiment}, experiment_prefix={experiment_prefix}" 

317 ) 

318 raise ValueError(msg) 

319 else: 

320 if not upload_results: 

321 _warn_once("'upload_results' parameter is in beta.") 

322 logger.debug(f"Running evaluation over target system {target}...") 

323 return await _aevaluate( 

324 target, 

325 data=data, 

326 evaluators=evaluators, 

327 summary_evaluators=summary_evaluators, 

328 metadata=metadata, 

329 experiment_prefix=experiment_prefix, 

330 description=description, 

331 max_concurrency=max_concurrency, 

332 num_repetitions=num_repetitions, 

333 client=client, 

334 blocking=blocking, 

335 experiment=experiment, 

336 upload_results=upload_results, 

337 error_handling=error_handling, 

338 ) 

339 

340 

341async def aevaluate_existing( 

342 experiment: Union[str, uuid.UUID, schemas.TracerSession], 

343 /, 

344 evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None, 

345 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None, 

346 metadata: Optional[dict] = None, 

347 max_concurrency: Optional[int] = 0, 

348 client: Optional[langsmith.Client] = None, 

349 load_nested: bool = False, 

350 blocking: bool = True, 

351) -> AsyncExperimentResults: 

352 r"""Evaluate existing experiment runs asynchronously. 

353 

354 Args: 

355 experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate. 

356 evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation. 

357 summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators 

358 to apply over the entire dataset. 

359 metadata (Optional[dict]): Optional metadata to include in the evaluation results. 

360 max_concurrency (int | None): The maximum number of concurrent 

361 evaluations to run. 

362 

363 If `None` then no limit is set. If `0` then no concurrency. 

364 client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation. 

365 load_nested: Whether to load all child runs for the experiment. 

366 

367 Default is to only load the top-level root runs. 

368 blocking (bool): Whether to block until evaluation is complete. 

369 

370 Returns: 

371 An async iterator over the experiment results. 

372 

373 Examples: 

374 Define your evaluators 

375 

376 >>> from typing import Sequence 

377 >>> from langsmith.schemas import Example, Run 

378 >>> def accuracy(run: Run, example: Example): 

379 ... # Row-level evaluator for accuracy. 

380 ... pred = run.outputs["output"] 

381 ... expected = example.outputs["answer"] 

382 ... return {"score": expected.lower() == pred.lower()} 

383 >>> def precision(runs: Sequence[Run], examples: Sequence[Example]): 

384 ... # Experiment-level evaluator for precision. 

385 ... # TP / (TP + FP) 

386 ... predictions = [run.outputs["output"].lower() for run in runs] 

387 ... expected = [example.outputs["answer"].lower() for example in examples] 

388 ... # yes and no are the only possible answers 

389 ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) 

390 ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) 

391 ... return {"score": tp / (tp + fp)} 

392 

393 Load the experiment and run the evaluation. 

394 

395 >>> import asyncio 

396 >>> import uuid 

397 >>> from langsmith import Client, aevaluate, aevaluate_existing 

398 >>> client = Client() 

399 >>> dataset_name = "__doctest_aevaluate_existing_" + uuid.uuid4().hex[:8] 

400 >>> dataset = client.create_dataset(dataset_name) 

401 >>> example = client.create_example( 

402 ... inputs={"question": "What is 2+2?"}, 

403 ... outputs={"answer": "4"}, 

404 ... dataset_id=dataset.id, 

405 ... ) 

406 >>> async def apredict(inputs: dict) -> dict: 

407 ... await asyncio.sleep(0.001) 

408 ... return {"output": "4"} 

409 >>> results = asyncio.run( 

410 ... aevaluate( 

411 ... apredict, data=dataset_name, experiment_prefix="doctest_experiment" 

412 ... ) 

413 ... ) # doctest: +ELLIPSIS 

414 View the evaluation results for experiment:... 

415 >>> experiment_id = results.experiment_name 

416 >>> # Consume all results to ensure evaluation is complete 

417 >>> async def consume_results(): 

418 ... result_list = [r async for r in results] 

419 ... return len(result_list) > 0 

420 >>> asyncio.run(consume_results()) 

421 True 

422 >>> import time 

423 >>> time.sleep(3) 

424 >>> results = asyncio.run( 

425 ... aevaluate_existing( 

426 ... experiment_id, 

427 ... evaluators=[accuracy], 

428 ... summary_evaluators=[precision], 

429 ... ) 

430 ... ) # doctest: +ELLIPSIS 

431 View the evaluation results for experiment:... 

432 >>> client.delete_dataset(dataset_id=dataset.id) 

433 

434 

435 """ # noqa: E501 

436 client = client or run_trees.get_cached_client() 

437 project = ( 

438 experiment 

439 if isinstance(experiment, schemas.TracerSession) 

440 else (await aitertools.aio_to_thread(_load_experiment, experiment, client)) 

441 ) 

442 runs = await aitertools.aio_to_thread( 

443 _load_traces, experiment, client, load_nested=load_nested 

444 ) 

445 data_map = await aitertools.aio_to_thread(_load_examples_map, client, project) 

446 data = [data_map[run.reference_example_id] for run in runs] 

447 return await _aevaluate( 

448 runs, 

449 data=data, 

450 evaluators=evaluators, 

451 summary_evaluators=summary_evaluators, 

452 metadata=metadata, 

453 max_concurrency=max_concurrency, 

454 client=client, 

455 blocking=blocking, 

456 experiment=project, 

457 ) 

458 

459 

460async def _aevaluate( 

461 target: Union[ATARGET_T, AsyncIterable[dict], Iterable[schemas.Run], Runnable], 

462 /, 

463 data: Union[DATA_T, AsyncIterable[schemas.Example]], 

464 evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None, 

465 summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None, 

466 metadata: Optional[dict] = None, 

467 experiment_prefix: Optional[str] = None, 

468 description: Optional[str] = None, 

469 max_concurrency: Optional[int] = None, 

470 num_repetitions: int = 1, 

471 client: Optional[langsmith.Client] = None, 

472 blocking: bool = True, 

473 experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None, 

474 upload_results: bool = True, 

475 error_handling: Literal["log", "ignore"] = "log", 

476) -> AsyncExperimentResults: 

477 is_async_target = ( 

478 asyncio.iscoroutinefunction(target) 

479 or (hasattr(target, "__aiter__") and asyncio.iscoroutine(target.__aiter__())) 

480 or _is_langchain_runnable(target) 

481 ) 

482 client = client or rt.get_cached_client() 

483 runs = None if is_async_target else cast(Iterable[schemas.Run], target) 

484 experiment_, runs = await aitertools.aio_to_thread( 

485 _resolve_experiment, 

486 experiment, 

487 runs, 

488 client, 

489 ) 

490 num_include_attachments = int( 

491 _target_include_attachments(target) 

492 ) + _evaluators_include_attachments(evaluators) 

493 manager = await _AsyncExperimentManager( 

494 data, 

495 client=client, 

496 metadata=metadata, 

497 experiment=experiment_ or experiment_prefix, 

498 description=description, 

499 num_repetitions=num_repetitions, 

500 runs=runs, 

501 include_attachments=num_include_attachments > 0, 

502 reuse_attachments=num_repetitions * num_include_attachments > 1, 

503 upload_results=upload_results, 

504 error_handling=error_handling, 

505 ).astart() 

506 cache_dir = ls_utils.get_cache_dir(None) 

507 if cache_dir is not None: 

508 dsid = await manager.get_dataset_id() 

509 cache_path = pathlib.Path(cache_dir) / f"{dsid}.yaml" 

510 else: 

511 cache_path = None 

512 with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]): 

513 if is_async_target: 

514 if evaluators: 

515 # Run predictions and evaluations in a single pipeline 

516 manager = await manager.awith_predictions_and_evaluators( 

517 cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency 

518 ) 

519 else: 

520 manager = await manager.awith_predictions( 

521 cast(ATARGET_T, target), max_concurrency=max_concurrency 

522 ) 

523 if summary_evaluators: 

524 manager = await manager.awith_summary_evaluators(summary_evaluators) 

525 else: 

526 if evaluators: 

527 manager = await manager.awith_evaluators( 

528 evaluators, max_concurrency=max_concurrency 

529 ) 

530 if summary_evaluators: 

531 manager = await manager.awith_summary_evaluators(summary_evaluators) 

532 results = AsyncExperimentResults(manager) 

533 if blocking: 

534 await results.wait() 

535 return results 

536 

537 

538class _AsyncExperimentManager(_ExperimentManagerMixin): 

539 """Manage the execution of experiments asynchronously. 

540 

541 Supports lazily running predictions and evaluations in parallel to facilitate 

542 result streaming and early debugging. 

543 

544 Args: 

545 data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR 

546 a generator of examples. 

547 runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment 

548 predictions. 

549 experiment (Optional[schemas.TracerSession]): The tracer session 

550 associated with the experiment. 

551 experiment_prefix (Optional[str]): The prefix for the experiment name. 

552 description (Optional[str]): The description for the experiment. 

553 metadata (Optional[dict]): Additional metadata for the experiment. 

554 client (Optional[langsmith.Client]): The Langsmith client used for 

555 the experiment. 

556 evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation 

557 sresults for the experiment. 

558 summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results 

559 for the experiment. 

560 num_repetitions (Optional[int], default=1): The number of repetitions for 

561 the experiment. 

562 include_attachments (Optional[bool], default=False): Whether to include 

563 attachments. This is used for when we pull the examples for the experiment. 

564 reuse_attachments (Optional[bool], default=False): Whether to reuse attachments 

565 from examples. This is True if we need to reuse attachments across multiple 

566 target/evaluator functions. 

567 upload_results (Optional[bool], default=True): Whether to upload results 

568 to Langsmith. 

569 attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data 

570 for attachments. Only used if we reuse attachments across multiple 

571 target/evaluator functions. 

572 error_handling (str, default="log"): How to handle individual run errors. 

573 

574 `'log'` will trace the runs with the error message as part of the 

575 experiment, `'ignore'` will not count the run as part of the experiment at 

576 all. 

577 """ 

578 

579 def __init__( 

580 self, 

581 data: Union[DATA_T, AsyncIterable[schemas.Example]], 

582 /, 

583 experiment: Optional[Union[schemas.TracerSession, str]] = None, 

584 metadata: Optional[dict] = None, 

585 runs: Optional[Union[Iterable[schemas.Run], AsyncIterable[schemas.Run]]] = None, 

586 client: Optional[langsmith.Client] = None, 

587 evaluation_results: Optional[AsyncIterable[EvaluationResults]] = None, 

588 summary_results: Optional[AsyncIterable[EvaluationResults]] = None, 

589 description: Optional[str] = None, 

590 num_repetitions: int = 1, 

591 include_attachments: bool = False, 

592 reuse_attachments: bool = False, 

593 upload_results: bool = True, 

594 attachment_raw_data_dict: Optional[dict] = None, 

595 error_handling: Literal["log", "ignore"] = "log", 

596 ): 

597 super().__init__( 

598 experiment=experiment, 

599 metadata=metadata, 

600 client=client, 

601 description=description, 

602 ) 

603 self._data = data 

604 self._examples: Optional[AsyncIterable[schemas.Example]] = None 

605 self._runs = ( 

606 aitertools.ensure_async_iterator(runs) if runs is not None else None 

607 ) 

608 self._evaluation_results = evaluation_results 

609 self._summary_results = summary_results 

610 self._num_repetitions = num_repetitions 

611 self._include_attachments = include_attachments 

612 self._reuse_attachments = reuse_attachments 

613 self._upload_results = upload_results 

614 self._attachment_raw_data_dict = attachment_raw_data_dict 

615 self._error_handling = error_handling 

616 

617 def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example: 

618 """Reset attachment readers for an example. 

619 

620 This is only in the case that an attachment is going to be used by more 

621 than 1 callable (target + evaluators). In that case we keep a single copy 

622 of the attachment data in self._attachment_raw_data_dict, and create 

623 readers from that data. This makes it so that we don't have to keep 

624 copies of the same data in memory, instead we can just create readers 

625 from the same data. 

626 """ 

627 if not hasattr(example, "attachments") or not example.attachments: 

628 return example 

629 

630 new_attachments: dict[str, schemas.AttachmentInfo] = {} 

631 for name, attachment in example.attachments.items(): 

632 if ( 

633 self._attachment_raw_data_dict is not None 

634 and str(example.id) + name in self._attachment_raw_data_dict 

635 ): 

636 new_attachments[name] = { 

637 "presigned_url": attachment["presigned_url"], 

638 "reader": io.BytesIO( 

639 self._attachment_raw_data_dict[str(example.id) + name] 

640 ), 

641 "mime_type": attachment["mime_type"], 

642 } 

643 else: 

644 new_attachments[name] = attachment 

645 

646 # Create a new Example instance with the updated attachments 

647 return schemas.Example( 

648 id=example.id, 

649 created_at=example.created_at, 

650 dataset_id=example.dataset_id, 

651 inputs=example.inputs, 

652 outputs=example.outputs, 

653 metadata=example.metadata, 

654 modified_at=example.modified_at, 

655 source_run_id=example.source_run_id, 

656 attachments=new_attachments, 

657 _host_url=example._host_url, 

658 _tenant_id=example._tenant_id, 

659 ) 

660 

661 async def aget_examples(self) -> AsyncIterator[schemas.Example]: 

662 if self._examples is None: 

663 self._examples = _aresolve_data( 

664 self._data, 

665 client=self.client, 

666 include_attachments=self._include_attachments, 

667 ) 

668 if self._reuse_attachments and self._attachment_raw_data_dict is None: 

669 examples_copy, self._examples = aitertools.atee(self._examples) 

670 self._attachment_raw_data_dict = { 

671 str(e.id) + name: value["reader"].read() 

672 async for e in examples_copy 

673 for name, value in (e.attachments or {}).items() 

674 } 

675 if self._num_repetitions > 1: 

676 examples_list = [example async for example in self._examples] 

677 self._examples = async_chain_from_iterable( 

678 [ 

679 async_iter_from_list( 

680 [ 

681 self._reset_example_attachments(example) 

682 for example in examples_list 

683 ] 

684 ) 

685 for _ in range(self._num_repetitions) 

686 ] 

687 ) 

688 

689 self._examples, examples_iter = aitertools.atee( 

690 aitertools.ensure_async_iterator(self._examples), 2, lock=asyncio.Lock() 

691 ) 

692 return examples_iter 

693 

694 async def get_dataset_id(self) -> str: 

695 if self._experiment is None or not getattr( 

696 self._experiment, "reference_dataset_id", None 

697 ): 

698 example = await aitertools.py_anext(await self.aget_examples()) 

699 if example is None: 

700 raise ValueError("No examples found in the dataset.") 

701 return str(example.dataset_id) 

702 return str(self._experiment.reference_dataset_id) 

703 

704 async def aget_runs(self) -> AsyncIterator[schemas.Run]: 

705 if self._runs is None: 

706 raise ValueError("Runs not loaded yet.") 

707 self._runs, runs = aitertools.atee( 

708 aitertools.ensure_async_iterator(self._runs), 2, lock=asyncio.Lock() 

709 ) 

710 async for run in runs: 

711 yield run 

712 

713 async def aget_evaluation_results(self) -> AsyncIterator[EvaluationResults]: 

714 if self._evaluation_results is None: 

715 async for _ in await self.aget_examples(): 

716 yield {"results": []} 

717 else: 

718 self._evaluation_results, evaluation_results = aitertools.atee( 

719 aitertools.ensure_async_iterator(self._evaluation_results), 

720 2, 

721 lock=asyncio.Lock(), 

722 ) 

723 async for result in evaluation_results: 

724 yield result 

725 

726 async def astart(self) -> _AsyncExperimentManager: 

727 try: 

728 first_example = await aitertools.py_anext(await self.aget_examples()) 

729 except StopAsyncIteration: 

730 raise ValueError( 

731 "No examples found in the dataset. " 

732 "Please ensure the data provided to aevaluate is not empty." 

733 ) 

734 if not first_example: 

735 raise ValueError( 

736 "No examples found in the dataset." 

737 "Please ensure the data provided to aevaluate is not empty." 

738 ) 

739 project = self._get_project(first_example) if self._upload_results else None 

740 self._print_experiment_start(project, first_example) 

741 self._metadata["num_repetitions"] = self._num_repetitions 

742 return self._copy( 

743 await self.aget_examples(), 

744 experiment=project, 

745 ) 

746 

747 def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example: 

748 new_attachments: dict[str, schemas.AttachmentInfo] = {} 

749 for name, attachment in (example.attachments or {}).items(): 

750 if ( 

751 self._attachment_raw_data_dict is not None 

752 and str(example.id) + name in self._attachment_raw_data_dict 

753 ): 

754 reader = io.BytesIO( 

755 self._attachment_raw_data_dict[str(example.id) + name] 

756 ) 

757 new_attachments[name] = { 

758 "presigned_url": attachment["presigned_url"], 

759 "reader": reader, 

760 "mime_type": attachment["mime_type"], 

761 } 

762 else: 

763 new_attachments[name] = attachment 

764 

765 return schemas.Example( 

766 id=example.id, 

767 created_at=example.created_at, 

768 dataset_id=example.dataset_id, 

769 inputs=example.inputs, 

770 outputs=example.outputs, 

771 metadata=example.metadata, 

772 modified_at=example.modified_at, 

773 source_run_id=example.source_run_id, 

774 attachments=new_attachments, 

775 _host_url=example._host_url, 

776 _tenant_id=example._tenant_id, 

777 ) 

778 

779 async def awith_predictions_and_evaluators( 

780 self, 

781 target: ATARGET_T, 

782 evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]], 

783 /, 

784 max_concurrency: Optional[int] = None, 

785 ) -> _AsyncExperimentManager: 

786 """Run predictions and evaluations in a single pipeline. 

787 

788 This allows evaluators to process results as soon as they're available from 

789 the target function, rather than waiting for all predictions to complete first. 

790 """ 

791 evaluators = _resolve_evaluators(evaluators) 

792 

793 if not hasattr(self, "_evaluation_feedback_executor"): 

794 self._evaluation_feedback_executor = cf.ThreadPoolExecutor(max_workers=4) 

795 

796 traceable_target = _ensure_async_traceable(target) 

797 

798 async def process_example(example: schemas.Example): 

799 # Yield the coroutine to be awaited later 

800 pred = await _aforward( 

801 traceable_target, 

802 self._get_example_with_readers(example), 

803 self.experiment_name, 

804 self._metadata, 

805 self.client, 

806 _target_include_attachments(target), 

807 self._error_handling, 

808 ) 

809 example, run = pred["example"], pred["run"] 

810 result = await self._arun_evaluators( 

811 evaluators, 

812 { 

813 "run": run, 

814 "example": example, 

815 "evaluation_results": {"results": []}, 

816 }, 

817 feedback_executor=self._evaluation_feedback_executor, 

818 ) 

819 return result 

820 

821 async def process_examples(): 

822 """Create a single task per example. 

823 

824 That task is to run the target function and all the evaluators 

825 sequentially. 

826 """ 

827 async for example in await self.aget_examples(): 

828 yield process_example(example) 

829 

830 await self._aend() 

831 

832 # Run the per-example tasks with max-concurrency 

833 # This guarantees that max_concurrency is the upper limit 

834 # for the number of target/evaluators that can be run in parallel 

835 experiment_results = aitertools.aiter_with_concurrency( 

836 max_concurrency, 

837 process_examples(), 

838 _eager_consumption_timeout=0.001, 

839 ) 

840 

841 r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock()) 

842 

843 return self._copy( 

844 (result["example"] async for result in r1), 

845 runs=(result["run"] async for result in r2), 

846 evaluation_results=(result["evaluation_results"] async for result in r3), 

847 ) 

848 

849 async def awith_predictions( 

850 self, 

851 target: ATARGET_T, 

852 /, 

853 max_concurrency: Optional[int] = None, 

854 ) -> _AsyncExperimentManager: 

855 _experiment_results = self._apredict( 

856 target, 

857 max_concurrency=max_concurrency, 

858 include_attachments=_target_include_attachments(target), 

859 ) 

860 r1, r2 = aitertools.atee(_experiment_results, 2, lock=asyncio.Lock()) 

861 return self._copy( 

862 (pred["example"] async for pred in r1), 

863 runs=(pred["run"] async for pred in r2), 

864 ) 

865 

866 async def awith_evaluators( 

867 self, 

868 evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]], 

869 *, 

870 max_concurrency: Optional[int] = None, 

871 ) -> _AsyncExperimentManager: 

872 evaluators = _resolve_evaluators(evaluators) 

873 experiment_results = self._ascore(evaluators, max_concurrency=max_concurrency) 

874 r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock()) 

875 return self._copy( 

876 (result["example"] async for result in r1), 

877 runs=(result["run"] async for result in r2), 

878 evaluation_results=(result["evaluation_results"] async for result in r3), 

879 ) 

880 

881 async def awith_summary_evaluators( 

882 self, 

883 summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], 

884 ) -> _AsyncExperimentManager: 

885 wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators) 

886 aggregate_feedback_gen = self._aapply_summary_evaluators(wrapped_evaluators) 

887 return self._copy( 

888 await self.aget_examples(), 

889 runs=self.aget_runs(), 

890 summary_results=aggregate_feedback_gen, 

891 ) 

892 

893 async def aget_results(self) -> AsyncIterator[ExperimentResultRow]: 

894 async for run, example, evaluation_results in aitertools.async_zip( 

895 self.aget_runs(), await self.aget_examples(), self.aget_evaluation_results() 

896 ): 

897 yield ExperimentResultRow( 

898 run=run, 

899 example=example, 

900 evaluation_results=evaluation_results, 

901 ) 

902 

903 async def aget_summary_scores(self) -> dict[str, list[dict]]: 

904 if self._summary_results is None: 

905 return {"results": []} 

906 return { 

907 "results": [ 

908 res # type: ignore[misc] 

909 async for results in self._summary_results 

910 for res in results["results"] 

911 ] 

912 } 

913 

914 ## Private methods 

915 

916 async def _apredict( 

917 self, 

918 target: ATARGET_T, 

919 /, 

920 max_concurrency: Optional[int] = None, 

921 include_attachments: bool = False, 

922 ) -> AsyncIterator[_ForwardResults]: 

923 fn = _ensure_async_traceable(target) 

924 

925 async def predict_all(): 

926 async for example in await self.aget_examples(): 

927 # Yield the coroutine to be awaited later 

928 yield _aforward( 

929 fn, 

930 self._get_example_with_readers(example), 

931 self.experiment_name, 

932 self._metadata, 

933 self.client, 

934 include_attachments, 

935 self._error_handling, 

936 ) 

937 

938 async for result in aitertools.aiter_with_concurrency( 

939 max_concurrency, predict_all(), _eager_consumption_timeout=0.001 

940 ): 

941 yield result 

942 

943 await self._aend() 

944 

945 async def _ascore( 

946 self, 

947 evaluators: Sequence[RunEvaluator], 

948 max_concurrency: Optional[int] = None, 

949 ) -> AsyncIterator[ExperimentResultRow]: 

950 with cf.ThreadPoolExecutor(max_workers=4) as feedback_executor: 

951 

952 async def score_all(): 

953 async for current_results in self.aget_results(): 

954 # Yield the coroutine to be awaited later in aiter_with_concurrency 

955 yield self._arun_evaluators( 

956 evaluators, current_results, feedback_executor=feedback_executor 

957 ) 

958 

959 async for result in aitertools.aiter_with_concurrency( 

960 max_concurrency, score_all(), _eager_consumption_timeout=0.001 

961 ): 

962 yield result 

963 

964 async def _arun_evaluators( 

965 self, 

966 evaluators: Sequence[RunEvaluator], 

967 current_results: ExperimentResultRow, 

968 feedback_executor: cf.ThreadPoolExecutor, 

969 ) -> ExperimentResultRow: 

970 current_context = rh.get_tracing_context() 

971 metadata = { 

972 **(current_context["metadata"] or {}), 

973 **{"experiment": self.experiment_name}, 

974 } 

975 with rh.tracing_context( 

976 **{ 

977 **current_context, 

978 "project_name": "evaluators", 

979 "metadata": metadata, 

980 "enabled": "local" if not self._upload_results else True, 

981 "client": self.client, 

982 } 

983 ): 

984 run = current_results["run"] 

985 example = current_results["example"] 

986 eval_results = current_results["evaluation_results"] 

987 

988 async def _run_single_evaluator(evaluator: RunEvaluator): 

989 evaluator_run_id = uuid.uuid4() 

990 try: 

991 evaluator_response = await evaluator.aevaluate_run( # type: ignore[call-arg] 

992 run=run, 

993 example=self._get_example_with_readers(example), 

994 evaluator_run_id=evaluator_run_id, 

995 ) 

996 selected_results = self.client._select_eval_results( 

997 evaluator_response 

998 ) 

999 

1000 if self._upload_results: 

1001 self.client._log_evaluation_feedback( 

1002 evaluator_response, run=run, _executor=feedback_executor 

1003 ) 

1004 return selected_results 

1005 except Exception as e: 

1006 try: 

1007 feedback_keys = _extract_feedback_keys(evaluator) 

1008 

1009 error_response = EvaluationResults( 

1010 results=[ 

1011 EvaluationResult( 

1012 key=key, 

1013 source_run_id=evaluator_run_id, 

1014 comment=repr(e), 

1015 extra={"error": True}, 

1016 ) 

1017 for key in feedback_keys 

1018 ] 

1019 ) 

1020 selected_results = self.client._select_eval_results( 

1021 error_response 

1022 ) 

1023 if self._upload_results: 

1024 self.client._log_evaluation_feedback( 

1025 error_response, run=run, _executor=feedback_executor 

1026 ) 

1027 return selected_results 

1028 except Exception as e2: 

1029 logger.debug(f"Error parsing feedback keys: {e2}") 

1030 pass 

1031 logger.error( 

1032 f"Error running evaluator {repr(evaluator)} on" 

1033 f" run {run.id}: {repr(e)}", 

1034 exc_info=True, 

1035 ) 

1036 

1037 all_results = [] 

1038 for evaluator in evaluators: 

1039 all_results.append(await _run_single_evaluator(evaluator)) 

1040 

1041 for result in all_results: 

1042 if result is not None: 

1043 eval_results["results"].extend(result) 

1044 return ExperimentResultRow( 

1045 run=run, 

1046 example=example, 

1047 evaluation_results=eval_results, 

1048 ) 

1049 

1050 async def _aapply_summary_evaluators( 

1051 self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] 

1052 ) -> AsyncIterator[EvaluationResults]: 

1053 runs, examples = [], [] 

1054 async_examples = aitertools.ensure_async_iterator(await self.aget_examples()) 

1055 async for run, example in aitertools.async_zip( 

1056 self.aget_runs(), async_examples 

1057 ): 

1058 runs.append(run) 

1059 examples.append(example) 

1060 aggregate_feedback = [] 

1061 project_id = self._get_experiment().id if self._upload_results else None 

1062 current_context = rh.get_tracing_context() 

1063 metadata = { 

1064 **(current_context["metadata"] or {}), 

1065 **{ 

1066 "experiment": self.experiment_name, 

1067 "experiment_id": project_id, 

1068 }, 

1069 } 

1070 with rh.tracing_context( 

1071 **{ 

1072 **current_context, 

1073 "project_name": "evaluators", 

1074 "metadata": metadata, 

1075 "enabled": "local" if not self._upload_results else True, 

1076 "client": self.client, 

1077 } 

1078 ): 

1079 for evaluator in summary_evaluators: 

1080 try: 

1081 summary_eval_result = evaluator(runs, examples) 

1082 flattened_results = self.client._select_eval_results( 

1083 summary_eval_result, 

1084 fn_name=evaluator.__name__, 

1085 ) 

1086 aggregate_feedback.extend(flattened_results) 

1087 if self._upload_results: 

1088 for result in flattened_results: 

1089 feedback = result.dict(exclude={"target_run_id"}) 

1090 evaluator_info = feedback.pop("evaluator_info", None) 

1091 await aitertools.aio_to_thread( 

1092 self.client.create_feedback, 

1093 **feedback, 

1094 run_id=None, 

1095 project_id=project_id, 

1096 source_info=evaluator_info, 

1097 ) 

1098 except Exception as e: 

1099 logger.error( 

1100 f"Error running summary evaluator {repr(evaluator)}: {e}", 

1101 exc_info=True, 

1102 ) 

1103 yield {"results": aggregate_feedback} 

1104 

1105 async def _get_dataset_version(self) -> Optional[str]: 

1106 modified_at = [] 

1107 async for example in await self.aget_examples(): 

1108 if example.modified_at: 

1109 # Should always be defined in practice when fetched, 

1110 # but the typing permits None 

1111 modified_at.append(example.modified_at) 

1112 

1113 max_modified_at = max(modified_at) if modified_at else None 

1114 return max_modified_at.isoformat() if max_modified_at else None 

1115 

1116 async def _get_dataset_splits(self) -> Optional[list[str]]: 

1117 splits = set() 

1118 async for example in await self.aget_examples(): 

1119 if ( 

1120 example.metadata 

1121 and example.metadata.get("dataset_split") 

1122 and isinstance(example.metadata["dataset_split"], list) 

1123 ): 

1124 for split in example.metadata["dataset_split"]: 

1125 if isinstance(split, str): 

1126 splits.add(split) 

1127 else: 

1128 splits.add("base") 

1129 

1130 return list(splits) 

1131 

1132 async def _aend(self) -> None: 

1133 if not self._upload_results: 

1134 return 

1135 experiment = self._experiment 

1136 if experiment is None: 

1137 raise ValueError("Experiment not started yet.") 

1138 

1139 project_metadata = self._get_experiment_metadata() 

1140 project_metadata["dataset_version"] = await self._get_dataset_version() 

1141 project_metadata["dataset_splits"] = await self._get_dataset_splits() 

1142 self.client.update_project( 

1143 experiment.id, 

1144 metadata={ 

1145 **experiment.metadata, 

1146 **project_metadata, 

1147 }, 

1148 ) 

1149 

1150 def _copy(self, *args: Any, **kwargs: Any) -> _AsyncExperimentManager: 

1151 default_args = (self._data,) 

1152 default_kwargs = { 

1153 "experiment": self._experiment, 

1154 "metadata": self._metadata, 

1155 "runs": self._runs, 

1156 "client": self.client, 

1157 "evaluation_results": self._evaluation_results, 

1158 "summary_results": self._summary_results, 

1159 "include_attachments": self._include_attachments, 

1160 "reuse_attachments": self._reuse_attachments, 

1161 "upload_results": self._upload_results, 

1162 "attachment_raw_data_dict": self._attachment_raw_data_dict, 

1163 "error_handling": self._error_handling, 

1164 } 

1165 full_args = list(args) + list(default_args[len(args) :]) 

1166 full_kwargs = {**default_kwargs, **kwargs} 

1167 return self.__class__(*full_args, **full_kwargs) 

1168 

1169 

1170class AsyncExperimentResults: 

1171 def __init__( 

1172 self, 

1173 experiment_manager: _AsyncExperimentManager, 

1174 ): 

1175 self._manager = experiment_manager 

1176 self._results: list[ExperimentResultRow] = [] 

1177 self._lock = asyncio.Lock() 

1178 self._task = asyncio.create_task(self._process_data(self._manager)) 

1179 self._processed_count = 0 

1180 

1181 @property 

1182 def experiment_name(self) -> str: 

1183 return self._manager.experiment_name 

1184 

1185 def __aiter__(self) -> AsyncIterator[ExperimentResultRow]: 

1186 return self 

1187 

1188 async def __anext__(self) -> ExperimentResultRow: 

1189 async def _wait_until_index(index: int) -> None: 

1190 while self._processed_count < index: 

1191 await asyncio.sleep(0.05) 

1192 

1193 while True: 

1194 async with self._lock: 

1195 if self._processed_count < len(self._results): 

1196 result = self._results[self._processed_count] 

1197 self._processed_count += 1 

1198 return result 

1199 elif self._task.done(): 

1200 raise StopAsyncIteration 

1201 

1202 await asyncio.shield( 

1203 asyncio.wait_for(_wait_until_index(len(self._results)), timeout=None) 

1204 ) 

1205 

1206 async def _process_data(self, manager: _AsyncExperimentManager) -> None: 

1207 tqdm = _load_tqdm() 

1208 async for item in tqdm(manager.aget_results()): 

1209 async with self._lock: 

1210 self._results.append(item) 

1211 summary_scores = await manager.aget_summary_scores() 

1212 async with self._lock: 

1213 self._summary_results = summary_scores 

1214 

1215 def to_pandas( 

1216 self, start: Optional[int] = 0, end: Optional[int] = None 

1217 ) -> DataFrame: 

1218 return _to_pandas(self._results, start=start, end=end) 

1219 

1220 def _repr_html_(self) -> str: 

1221 import importlib.util 

1222 

1223 if self._results and importlib.util.find_spec("pandas"): 

1224 df = self.to_pandas(0, 5) 

1225 return df._repr_html_() # type: ignore[operator] 

1226 else: 

1227 return self.__repr__() 

1228 

1229 def __len__(self) -> int: 

1230 return len(self._results) 

1231 

1232 def __repr__(self) -> str: 

1233 return f"<AsyncExperimentResults {self.experiment_name}>" 

1234 

1235 async def wait(self) -> None: 

1236 await self._task 

1237 

1238 

1239async def _aforward( 

1240 fn: rh.SupportsLangsmithExtra[[dict], Awaitable], 

1241 example: schemas.Example, 

1242 experiment_name: str, 

1243 metadata: dict, 

1244 client: langsmith.Client, 

1245 include_attachments: bool = False, 

1246 error_handling: Literal["log", "ignore"] = "log", 

1247) -> _ForwardResults: 

1248 run: Optional[schemas.RunBase] = None 

1249 

1250 def _get_run(r: run_trees.RunTree) -> None: 

1251 nonlocal run 

1252 run = r 

1253 

1254 def _set_reference_example_id(r: rt.RunTree) -> None: 

1255 r.reference_example_id = example.id 

1256 

1257 langsmith_extra = rh.LangSmithExtra( 

1258 on_end=_get_run, 

1259 project_name=experiment_name, 

1260 metadata={ 

1261 **metadata, 

1262 "example_version": (example.modified_at or example.created_at).isoformat(), 

1263 }, 

1264 client=client, 

1265 ) 

1266 if error_handling == "log": 

1267 langsmith_extra["reference_example_id"] = example.id 

1268 elif error_handling == "ignore": 

1269 langsmith_extra["_on_success"] = _set_reference_example_id 

1270 else: 

1271 raise ValueError(f"Unrecognized error_handling value: {error_handling=}") 

1272 

1273 with rh.tracing_context(enabled=True): 

1274 try: 

1275 arg_names = _get_target_args(fn) 

1276 args = [getattr(example, argn) for argn in arg_names] 

1277 await fn(*args, langsmith_extra=langsmith_extra) 

1278 except Exception as e: 

1279 logger.error( 

1280 f"Error running target function: {e}", exc_info=True, stacklevel=1 

1281 ) 

1282 return _ForwardResults( 

1283 run=cast(schemas.Run, run), 

1284 example=example, 

1285 ) 

1286 

1287 

1288def _ensure_async_traceable( 

1289 target: ATARGET_T, 

1290) -> rh.SupportsLangsmithExtra[[dict], Awaitable]: 

1291 if not asyncio.iscoroutinefunction(target) and not _is_langchain_runnable(target): 

1292 if callable(target): 

1293 raise ValueError( 

1294 "Target must be an async function. For sync functions, use evaluate." 

1295 " Example usage:\n\n" 

1296 "async def predict(inputs: dict) -> dict:\n" 

1297 " # do work, like chain.invoke(inputs)\n" 

1298 " return {...}\n" 

1299 "await aevaluate(predict, ...)" 

1300 ) 

1301 else: 

1302 raise ValueError( 

1303 "Target must be a callable async function. " 

1304 "Received a non-callable object. Example usage:\n\n" 

1305 "async def predict(inputs: dict) -> dict:\n" 

1306 " # do work, like chain.invoke(inputs)\n" 

1307 " return {...}\n" 

1308 "await aevaluate(predict, ...)" 

1309 ) 

1310 if rh.is_traceable_function(target): 

1311 return target # type: ignore 

1312 else: 

1313 if _is_langchain_runnable(target): 

1314 target = target.ainvoke # type: ignore[union-attr] 

1315 return rh.traceable(name="AsyncTarget")(target) # type: ignore[arg-type] 

1316 

1317 

1318def _aresolve_data( 

1319 data: Union[DATA_T, AsyncIterable[schemas.Example]], 

1320 *, 

1321 client: langsmith.Client, 

1322 include_attachments: bool = False, 

1323) -> AsyncIterator[schemas.Example]: 

1324 """Return the examples for the given dataset.""" 

1325 if isinstance(data, AsyncIterable): 

1326 return aitertools.ensure_async_iterator(data) 

1327 return aitertools.ensure_async_iterator( 

1328 _resolve_data(data, client=client, include_attachments=include_attachments) 

1329 ) 

1330 

1331 

1332T = TypeVar("T") 

1333 

1334 

1335async def async_chain_from_iterable( 

1336 iterable: Iterable[AsyncIterable[T]], 

1337) -> AsyncIterator[T]: 

1338 """Chain multiple async iterables.""" 

1339 for sub_iterable in iterable: 

1340 async for item in sub_iterable: 

1341 yield item 

1342 

1343 

1344async def async_iter_from_list( 

1345 examples: list[schemas.Example], 

1346) -> AsyncIterable[schemas.Example]: 

1347 """Convert a list of examples to an async iterable.""" 

1348 for example in examples: 

1349 yield example