Coverage for langsmith/evaluation/evaluator.py: 44%

376 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""This module contains the evaluator classes for evaluating runs.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import inspect 

7import uuid 

8from abc import abstractmethod 

9from collections.abc import Awaitable, Sequence 

10from typing import ( 

11 Any, 

12 Callable, 

13 Literal, 

14 Optional, 

15 Union, 

16 cast, 

17) 

18 

19from typing_extensions import TypedDict 

20 

21from langsmith import run_helpers as rh 

22from langsmith import schemas 

23 

24try: 

25 from pydantic.v1 import ( # type: ignore[import] 

26 BaseModel, 

27 Field, 

28 ValidationError, 

29 validator, 

30 ) 

31except ImportError: 

32 from pydantic import ( # type: ignore[assignment] 

33 BaseModel, 

34 Field, 

35 ValidationError, 

36 validator, 

37 ) 

38 

39import logging 

40from functools import wraps 

41 

42from langsmith.schemas import SCORE_TYPE, VALUE_TYPE, Example, Run 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47class Category(TypedDict): 

48 """A category for categorical feedback.""" 

49 

50 value: Optional[Union[float, int]] 

51 """The numeric score/ordinal corresponding to this category.""" 

52 label: str 

53 """The label for this category.""" 

54 

55 

56class FeedbackConfig(TypedDict, total=False): 

57 """Configuration to define a type of feedback. 

58 

59 Applied on on the first creation of a `feedback_key`. 

60 """ 

61 

62 type: Literal["continuous", "categorical", "freeform"] 

63 """The type of feedback.""" 

64 min: Optional[Union[float, int]] 

65 """The minimum permitted value (if continuous type).""" 

66 max: Optional[Union[float, int]] 

67 """The maximum value permitted value (if continuous type).""" 

68 categories: Optional[list[Union[Category, dict]]] 

69 

70 

71class EvaluationResult(BaseModel): 

72 """Evaluation result.""" 

73 

74 key: str 

75 """The aspect, metric name, or label for this evaluation.""" 

76 score: SCORE_TYPE = None 

77 """The numeric score for this evaluation.""" 

78 value: VALUE_TYPE = None 

79 """The value for this evaluation, if not numeric.""" 

80 comment: Optional[str] = None 

81 """An explanation regarding the evaluation.""" 

82 correction: Optional[dict] = None 

83 """What the correct value should be, if applicable.""" 

84 evaluator_info: dict = Field(default_factory=dict) 

85 """Additional information about the evaluator.""" 

86 feedback_config: Optional[Union[FeedbackConfig, dict]] = None 

87 """The configuration used to generate this feedback.""" 

88 source_run_id: Optional[Union[uuid.UUID, str]] = None 

89 """The ID of the trace of the evaluator itself.""" 

90 target_run_id: Optional[Union[uuid.UUID, str]] = None 

91 """The ID of the trace this evaluation is applied to. 

92  

93 If none provided, the evaluation feedback is applied to the 

94 root trace being.""" 

95 extra: Optional[dict] = None 

96 """Metadata for the evaluator run.""" 

97 

98 class Config: 

99 """Pydantic model configuration.""" 

100 

101 allow_extra = False 

102 

103 @validator("value", pre=True) 

104 def check_value_non_numeric(cls, v, values): 

105 """Check that the value is not numeric.""" 

106 # If a score isn't provided and the value is numeric 

107 # it's more likely the user intended use the score field 

108 if "score" not in values or values["score"] is None: 

109 if isinstance(v, (int, float)): 

110 logger.warning( 

111 "Numeric values should be provided in" 

112 " the 'score' field, not 'value'." 

113 f" Got: {v}" 

114 ) 

115 return v 

116 

117 

118class EvaluationResults(TypedDict, total=False): 

119 """Batch evaluation results. 

120 

121 This makes it easy for your evaluator to return multiple 

122 metrics at once. 

123 """ 

124 

125 results: list[EvaluationResult] 

126 """The evaluation results.""" 

127 

128 

129class RunEvaluator: 

130 """Evaluator interface class.""" 

131 

132 @abstractmethod 

133 def evaluate_run( 

134 self, 

135 run: Run, 

136 example: Optional[Example] = None, 

137 evaluator_run_id: Optional[uuid.UUID] = None, 

138 ) -> Union[EvaluationResult, EvaluationResults]: 

139 """Evaluate an example.""" 

140 

141 async def aevaluate_run( 

142 self, 

143 run: Run, 

144 example: Optional[Example] = None, 

145 evaluator_run_id: Optional[uuid.UUID] = None, 

146 ) -> Union[EvaluationResult, EvaluationResults]: 

147 """Evaluate an example asynchronously.""" 

148 current_context = rh.get_tracing_context() 

149 

150 def _run_with_context(): 

151 with rh.tracing_context(**current_context): 

152 return self.evaluate_run(run, example, evaluator_run_id) 

153 

154 return await asyncio.get_running_loop().run_in_executor(None, _run_with_context) 

155 

156 

157_RUNNABLE_OUTPUT = Union[EvaluationResult, EvaluationResults, dict] 

158 

159 

160class ComparisonEvaluationResult(BaseModel): 

161 """Feedback scores for the results of comparative evaluations. 

162 

163 These are generated by functions that compare two or more runs, 

164 returning a ranking or other feedback. 

165 """ 

166 

167 key: str 

168 """The aspect, metric name, or label for this evaluation.""" 

169 scores: dict[Union[uuid.UUID, str], SCORE_TYPE] 

170 """The scores for each run in the comparison.""" 

171 source_run_id: Optional[Union[uuid.UUID, str]] = None 

172 """The ID of the trace of the evaluator itself.""" 

173 comment: Optional[Union[str, dict[Union[uuid.UUID, str], str]]] = None 

174 """Comment for the scores. If a string, it's shared across all target runs. 

175  

176 If a `dict`, it maps run IDs to individual comments. 

177 """ 

178 

179 

180_COMPARISON_OUTPUT = Union[ComparisonEvaluationResult, dict] 

181 

182 

183class DynamicRunEvaluator(RunEvaluator): 

184 """A dynamic evaluator that wraps a function and transforms it into a `RunEvaluator`. 

185 

186 This class is designed to be used with the `@run_evaluator` decorator, allowing 

187 functions that take a `Run` and an optional `Example` as arguments, and return 

188 an `EvaluationResult` or `EvaluationResults`, to be used as instances of `RunEvaluator`. 

189 

190 Attributes: 

191 func (Callable): The function that is wrapped by this evaluator. 

192 """ # noqa: E501 

193 

194 def __init__( 

195 self, 

196 func: Callable[ 

197 [Run, Optional[Example]], 

198 Union[_RUNNABLE_OUTPUT, Awaitable[_RUNNABLE_OUTPUT]], 

199 ], 

200 # Async function to be used for async evaluation. Optional 

201 afunc: Optional[ 

202 Callable[ 

203 [Run, Optional[Example]], 

204 Awaitable[_RUNNABLE_OUTPUT], 

205 ] 

206 ] = None, 

207 ): 

208 """Initialize the `DynamicRunEvaluator` with a given function. 

209 

210 Args: 

211 func (Callable): A function that takes a `Run` and an optional `Example` as 

212 arguments, and returns a dict or `ComparisonEvaluationResult`. 

213 """ 

214 (func, prepare_inputs) = _normalize_evaluator_func(func) 

215 if afunc: 

216 (afunc, prepare_inputs) = _normalize_evaluator_func(afunc) # type: ignore[assignment] 

217 

218 def process_inputs(inputs: dict) -> dict: 

219 if prepare_inputs is None: 

220 return inputs 

221 (_, _, traced_inputs) = prepare_inputs( 

222 inputs.get("run"), inputs.get("example") 

223 ) 

224 return traced_inputs 

225 

226 wraps(func)(self) 

227 from langsmith import run_helpers # type: ignore 

228 

229 if afunc is not None: 

230 self.afunc = run_helpers.ensure_traceable( 

231 afunc, process_inputs=process_inputs 

232 ) 

233 self._name = getattr(afunc, "__name__", "DynamicRunEvaluator") 

234 if inspect.iscoroutinefunction(func): 

235 if afunc is not None: 

236 raise TypeError( 

237 "Func was provided as a coroutine function, but afunc was " 

238 "also provided. If providing both, func should be a regular " 

239 "function to avoid ambiguity." 

240 ) 

241 self.afunc = run_helpers.ensure_traceable( 

242 func, process_inputs=process_inputs 

243 ) 

244 self._name = getattr(func, "__name__", "DynamicRunEvaluator") 

245 else: 

246 self.func = run_helpers.ensure_traceable( 

247 cast(Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT], func), 

248 process_inputs=process_inputs, 

249 ) 

250 self._name = getattr(func, "__name__", "DynamicRunEvaluator") 

251 

252 def _coerce_evaluation_result( 

253 self, 

254 result: Union[EvaluationResult, dict], 

255 source_run_id: uuid.UUID, 

256 allow_no_key: bool = False, 

257 ) -> EvaluationResult: 

258 if isinstance(result, EvaluationResult): 

259 if not result.source_run_id: 

260 result.source_run_id = source_run_id 

261 return result 

262 try: 

263 if not result: 

264 raise ValueError( 

265 "Expected an EvaluationResult object, or dict with a metric" 

266 f" 'key' and optional 'score'; got empty result: {result}" 

267 ) 

268 if "key" not in result and allow_no_key: 

269 result["key"] = self._name 

270 if all(k not in result for k in ("score", "value", "comment")): 

271 raise ValueError( 

272 "Expected an EvaluationResult object, or dict with a metric" 

273 f" 'key' and optional 'score' or categorical 'value'; got {result}" 

274 ) 

275 return EvaluationResult(**{"source_run_id": source_run_id, **result}) 

276 except ValidationError as e: 

277 raise ValueError( 

278 "Expected an EvaluationResult object, or dict with a metric" 

279 f" 'key' and optional 'score'; got {result}" 

280 ) from e 

281 

282 def _coerce_evaluation_results( 

283 self, 

284 results: Union[dict, EvaluationResults], 

285 source_run_id: uuid.UUID, 

286 ) -> Union[EvaluationResult, EvaluationResults]: 

287 if "results" in results: 

288 cp = results.copy() 

289 cp["results"] = [ 

290 self._coerce_evaluation_result(r, source_run_id=source_run_id) 

291 for r in results["results"] 

292 ] 

293 return EvaluationResults(**cp) 

294 

295 return self._coerce_evaluation_result( 

296 cast(dict, results), source_run_id=source_run_id, allow_no_key=True 

297 ) 

298 

299 def _format_result( 

300 self, 

301 result: Union[ 

302 EvaluationResult, EvaluationResults, dict, str, int, bool, float, list 

303 ], 

304 source_run_id: uuid.UUID, 

305 ) -> Union[EvaluationResult, EvaluationResults]: 

306 if isinstance(result, EvaluationResult): 

307 if not result.source_run_id: 

308 result.source_run_id = source_run_id 

309 return result 

310 result = _format_evaluator_result(result) 

311 return self._coerce_evaluation_results(result, source_run_id) 

312 

313 @property 

314 def is_async(self) -> bool: 

315 """Check if the evaluator function is asynchronous. 

316 

317 Returns: 

318 bool: `True` if the evaluator function is asynchronous, `False` otherwise. 

319 """ 

320 return hasattr(self, "afunc") 

321 

322 def evaluate_run( 

323 self, 

324 run: Run, 

325 example: Optional[Example] = None, 

326 evaluator_run_id: Optional[uuid.UUID] = None, 

327 ) -> Union[EvaluationResult, EvaluationResults]: 

328 """Evaluate a run using the wrapped function. 

329 

330 This method directly invokes the wrapped function with the provided arguments. 

331 

332 Args: 

333 run (Run): The run to be evaluated. 

334 example (Optional[Example]): An optional example to be used in the evaluation. 

335 

336 Returns: 

337 Union[EvaluationResult, EvaluationResults]: The result of the evaluation. 

338 """ # noqa: E501 

339 if not hasattr(self, "func"): 

340 running_loop = asyncio.get_event_loop() 

341 if running_loop.is_running(): 

342 raise RuntimeError( 

343 "Cannot call `evaluate_run` on an async run evaluator from" 

344 " within an running event loop. Use `aevaluate_run` instead." 

345 ) 

346 else: 

347 return running_loop.run_until_complete(self.aevaluate_run(run, example)) 

348 if evaluator_run_id is None: 

349 evaluator_run_id = uuid.uuid4() 

350 metadata: dict[str, Any] = {"target_run_id": run.id} 

351 if getattr(run, "session_id", None): 

352 metadata["experiment"] = str(run.session_id) 

353 result = self.func( 

354 run, 

355 example, 

356 langsmith_extra={"run_id": evaluator_run_id, "metadata": metadata}, 

357 ) 

358 return self._format_result(result, evaluator_run_id) 

359 

360 async def aevaluate_run( 

361 self, 

362 run: Run, 

363 example: Optional[Example] = None, 

364 evaluator_run_id: Optional[uuid.UUID] = None, 

365 ): 

366 """Evaluate a run asynchronously using the wrapped async function. 

367 

368 This method directly invokes the wrapped async function with the 

369 provided arguments. 

370 

371 Args: 

372 run (Run): The run to be evaluated. 

373 example (Optional[Example]): An optional example to be used 

374 in the evaluation. 

375 

376 Returns: 

377 Union[EvaluationResult, EvaluationResults]: The result of the evaluation. 

378 """ 

379 if not hasattr(self, "afunc"): 

380 return await super().aevaluate_run(run, example) 

381 if evaluator_run_id is None: 

382 evaluator_run_id = uuid.uuid4() 

383 metadata: dict[str, Any] = {"target_run_id": run.id} 

384 if getattr(run, "session_id", None): 

385 metadata["experiment"] = str(run.session_id) 

386 result = await self.afunc( 

387 run, 

388 example, 

389 langsmith_extra={"run_id": evaluator_run_id, "metadata": metadata}, 

390 ) 

391 return self._format_result(result, evaluator_run_id) 

392 

393 def __call__( 

394 self, run: Run, example: Optional[Example] = None 

395 ) -> Union[EvaluationResult, EvaluationResults]: 

396 """Make the evaluator callable, allowing it to be used like a function. 

397 

398 This method enables the evaluator instance to be called directly, forwarding the 

399 call to `evaluate_run`. 

400 

401 Args: 

402 run (Run): The run to be evaluated. 

403 example (Optional[Example]): An optional example to be used in the evaluation. 

404 

405 Returns: 

406 Union[EvaluationResult, EvaluationResults]: The result of the evaluation. 

407 """ # noqa: E501 

408 return self.evaluate_run(run, example) 

409 

410 def __repr__(self) -> str: 

411 """Represent the DynamicRunEvaluator object.""" 

412 return f"<DynamicRunEvaluator {self._name}>" 

413 

414 

415def run_evaluator( 

416 func: Callable[ 

417 [Run, Optional[Example]], Union[_RUNNABLE_OUTPUT, Awaitable[_RUNNABLE_OUTPUT]] 

418 ], 

419): 

420 """Create a run evaluator from a function. 

421 

422 Decorator that transforms a function into a `RunEvaluator`. 

423 """ 

424 return DynamicRunEvaluator(func) 

425 

426 

427_MAXSIZE = 10_000 

428 

429 

430def _maxsize_repr(obj: Any): 

431 s = repr(obj) 

432 if len(s) > _MAXSIZE: 

433 s = s[: _MAXSIZE - 4] + "...)" 

434 return s 

435 

436 

437class DynamicComparisonRunEvaluator: 

438 """Compare predictions (as traces) from 2 or more runs.""" 

439 

440 def __init__( 

441 self, 

442 func: Callable[ 

443 [Sequence[Run], Optional[Example]], 

444 Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]], 

445 ], 

446 # Async function to be used for async evaluation. Optional 

447 afunc: Optional[ 

448 Callable[ 

449 [Sequence[Run], Optional[Example]], 

450 Awaitable[_COMPARISON_OUTPUT], 

451 ] 

452 ] = None, 

453 ): 

454 """Initialize the `DynamicRunEvaluator` with a given function. 

455 

456 Args: 

457 func (Callable): A function that takes a `Run` and an optional `Example` as 

458 arguments, and returns an `EvaluationResult` or `EvaluationResults`. 

459 """ 

460 (func, prepare_inputs) = _normalize_comparison_evaluator_func(func) 

461 if afunc: 

462 (afunc, prepare_inputs) = _normalize_comparison_evaluator_func(afunc) # type: ignore[assignment] 

463 

464 def process_inputs(inputs: dict) -> dict: 

465 if prepare_inputs is None: 

466 return inputs 

467 (_, _, traced_inputs) = prepare_inputs( 

468 inputs.get("runs"), inputs.get("example") 

469 ) 

470 return traced_inputs 

471 

472 wraps(func)(self) 

473 from langsmith import run_helpers # type: ignore 

474 

475 if afunc is not None: 

476 self.afunc = run_helpers.ensure_traceable( 

477 afunc, process_inputs=process_inputs 

478 ) 

479 self._name = getattr(afunc, "__name__", "DynamicRunEvaluator") 

480 if inspect.iscoroutinefunction(func): 

481 if afunc is not None: 

482 raise TypeError( 

483 "Func was provided as a coroutine function, but afunc was " 

484 "also provided. If providing both, func should be a regular " 

485 "function to avoid ambiguity." 

486 ) 

487 self.afunc = run_helpers.ensure_traceable( 

488 func, process_inputs=process_inputs 

489 ) 

490 self._name = getattr(func, "__name__", "DynamicRunEvaluator") 

491 else: 

492 self.func = run_helpers.ensure_traceable( 

493 cast( 

494 Callable[ 

495 [Sequence[Run], Optional[Example]], 

496 _COMPARISON_OUTPUT, 

497 ], 

498 func, 

499 ), 

500 process_inputs=process_inputs, 

501 ) 

502 self._name = getattr(func, "__name__", "DynamicRunEvaluator") 

503 

504 @property 

505 def is_async(self) -> bool: 

506 """Check if the evaluator function is asynchronous. 

507 

508 Returns: 

509 bool: `True` if the evaluator function is asynchronous, `False` otherwise. 

510 """ 

511 return hasattr(self, "afunc") 

512 

513 def compare_runs( 

514 self, runs: Sequence[Run], example: Optional[Example] = None 

515 ) -> ComparisonEvaluationResult: 

516 """Compare runs to score preferences. 

517 

518 Args: 

519 runs: A list of runs to compare. 

520 example: An optional example to be used in the evaluation. 

521 

522 """ # noqa: E501 

523 if not hasattr(self, "func"): 

524 running_loop = asyncio.get_event_loop() 

525 if running_loop.is_running(): 

526 raise RuntimeError( 

527 "Cannot call `evaluate_run` on an async run evaluator from" 

528 " within an running event loop. Use `aevaluate_run` instead." 

529 ) 

530 else: 

531 return running_loop.run_until_complete( 

532 self.acompare_runs(runs, example) 

533 ) 

534 source_run_id = uuid.uuid4() 

535 tags = self._get_tags(runs) 

536 # TODO: Add metadata for the "comparison experiment" here 

537 result = self.func( 

538 runs, 

539 example, 

540 langsmith_extra={"run_id": source_run_id, "tags": tags}, 

541 ) 

542 return self._format_results(result, source_run_id, runs) 

543 

544 async def acompare_runs( 

545 self, runs: Sequence[Run], example: Optional[Example] = None 

546 ) -> ComparisonEvaluationResult: 

547 """Evaluate a run asynchronously using the wrapped async function. 

548 

549 This method directly invokes the wrapped async function with the 

550 provided arguments. 

551 

552 Args: 

553 runs (Run): The runs to be evaluated. 

554 example (Optional[Example]): An optional example to be used 

555 in the evaluation. 

556 

557 Returns: 

558 ComparisonEvaluationResult: The result of the evaluation. 

559 """ 

560 if not hasattr(self, "afunc"): 

561 return self.compare_runs(runs, example) 

562 source_run_id = uuid.uuid4() 

563 tags = self._get_tags(runs) 

564 # TODO: Add metadata for the "comparison experiment" here 

565 result = await self.afunc( 

566 runs, 

567 example, 

568 langsmith_extra={"run_id": source_run_id, "tags": tags}, 

569 ) 

570 return self._format_results(result, source_run_id, runs) 

571 

572 def __call__( 

573 self, runs: Sequence[Run], example: Optional[Example] = None 

574 ) -> ComparisonEvaluationResult: 

575 """Make the evaluator callable, allowing it to be used like a function. 

576 

577 This method enables the evaluator instance to be called directly, forwarding the 

578 call to `evaluate_run`. 

579 

580 Args: 

581 run (Run): The run to be evaluated. 

582 example (Optional[Example]): An optional example to be used in the evaluation. 

583 

584 Returns: 

585 ComparisonEvaluationResult: The result of the evaluation. 

586 """ # noqa: E501 

587 return self.compare_runs(runs, example) 

588 

589 def __repr__(self) -> str: 

590 """Represent the DynamicRunEvaluator object.""" 

591 return f"<DynamicComparisonRunEvaluator {self._name}>" 

592 

593 @staticmethod 

594 def _get_tags(runs: Sequence[Run]) -> list[str]: 

595 """Extract tags from runs.""" 

596 # Add tags to support filtering 

597 tags = [] 

598 for run in runs: 

599 tags.append("run:" + str(run.id)) 

600 if getattr(run, "session_id", None): 

601 tags.append("experiment:" + str(run.session_id)) 

602 return tags 

603 

604 def _format_results( 

605 self, 

606 result: Union[dict, list, ComparisonEvaluationResult], 

607 source_run_id: uuid.UUID, 

608 runs: Sequence[Run], 

609 ) -> ComparisonEvaluationResult: 

610 if isinstance(result, ComparisonEvaluationResult): 

611 if not result.source_run_id: 

612 result.source_run_id = source_run_id 

613 return result 

614 elif isinstance(result, list): 

615 result = { 

616 "scores": {run.id: score for run, score in zip(runs, result)}, 

617 "key": self._name, 

618 "source_run_id": source_run_id, 

619 } 

620 elif isinstance(result, dict): 

621 if "key" not in result: 

622 result["key"] = self._name 

623 else: 

624 msg = ( 

625 "Expected 'dict', 'list' or 'ComparisonEvaluationResult' result " 

626 f"object. Received: {result=}" 

627 ) 

628 raise ValueError(msg) 

629 try: 

630 return ComparisonEvaluationResult( 

631 **{"source_run_id": source_run_id, **result} 

632 ) 

633 except ValidationError as e: 

634 raise ValueError( 

635 f"Expected a dictionary with a 'key' and dictionary of scores mapping" 

636 "run IDs to numeric scores, or ComparisonEvaluationResult object," 

637 f" got {result}" 

638 ) from e 

639 

640 

641def comparison_evaluator( 

642 func: Callable[ 

643 [Sequence[Run], Optional[Example]], 

644 Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]], 

645 ], 

646) -> DynamicComparisonRunEvaluator: 

647 """Create a comaprison evaluator from a function.""" 

648 return DynamicComparisonRunEvaluator(func) 

649 

650 

651def _normalize_evaluator_func( 

652 func: Callable, 

653) -> tuple[ 

654 Union[ 

655 Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT], 

656 Callable[[Run, Optional[Example]], Awaitable[_RUNNABLE_OUTPUT]], 

657 ], 

658 Optional[Callable[..., dict]], 

659]: 

660 supported_args = ( 

661 "run", 

662 "example", 

663 "inputs", 

664 "outputs", 

665 "reference_outputs", 

666 "attachments", 

667 ) 

668 sig = inspect.signature(func) 

669 all_args = [pname for pname, p in sig.parameters.items() if p.kind != p.VAR_KEYWORD] 

670 args_with_defaults = [ 

671 pname 

672 for pname, p in sig.parameters.items() 

673 if p.default is not inspect.Parameter.empty 

674 ] 

675 if not all_args or ( 

676 not all( 

677 pname in supported_args or pname in args_with_defaults for pname in all_args 

678 ) 

679 and len([a for a in all_args if a not in args_with_defaults]) != 2 

680 ): 

681 msg = ( 

682 f"Invalid evaluator function. Must have at least one " 

683 f"argument. Supported arguments are {supported_args}. Please " 

684 f"see https://docs.smith.langchain.com/evaluation/how_to_guides/evaluation/evaluate_llm_application#use-custom-evaluators" 

685 # noqa: E501 

686 ) 

687 raise ValueError(msg) 

688 # For backwards compatibility we assume custom arg names are Run and Example 

689 # types, respectively. 

690 elif not all( 

691 pname in supported_args or pname in args_with_defaults for pname in all_args 

692 ) or all_args == [ 

693 "run", 

694 "example", 

695 ]: 

696 return func, None 

697 else: 

698 if inspect.iscoroutinefunction(func): 

699 

700 def _prepare_inputs( 

701 run: Run, example: Optional[Example] 

702 ) -> tuple[list, dict, dict]: 

703 arg_map = { 

704 "run": run, 

705 "example": example, 

706 "inputs": example.inputs if example else {}, 

707 "outputs": run.outputs or {}, 

708 "attachments": example.attachments or {} if example else {}, 

709 "reference_outputs": example.outputs or {} if example else {}, 

710 } 

711 kwargs = {} 

712 args = [] 

713 traced_inputs = {} 

714 for param_name, param in sig.parameters.items(): 

715 # Could have params with defaults that are not in the arg map 

716 if param_name in arg_map: 

717 if param.kind in ( 

718 param.POSITIONAL_OR_KEYWORD, 

719 param.POSITIONAL_ONLY, 

720 ): 

721 args.append(arg_map[param_name]) 

722 else: 

723 kwargs[param_name] = arg_map[param_name] 

724 traced_inputs[param_name] = ( 

725 _maxsize_repr(arg_map[param_name]) 

726 if param_name in ("run", "example") 

727 else arg_map[param_name] 

728 ) 

729 return args, kwargs, traced_inputs 

730 

731 async def awrapper( 

732 run: Run, example: Optional[Example] 

733 ) -> _RUNNABLE_OUTPUT: 

734 (args, kwargs, _) = _prepare_inputs(run, example) 

735 return await func(*args, **kwargs) 

736 

737 awrapper.__name__ = ( 

738 getattr(func, "__name__") 

739 if hasattr(func, "__name__") 

740 else awrapper.__name__ 

741 ) 

742 return (awrapper, _prepare_inputs) # type: ignore[return-value] 

743 

744 else: 

745 

746 def _prepare_inputs( 

747 run: Run, example: Optional[Example] 

748 ) -> tuple[list, dict, dict]: 

749 arg_map = { 

750 "run": run, 

751 "example": example, 

752 "inputs": example.inputs if example else {}, 

753 "outputs": run.outputs or {}, 

754 "attachments": example.attachments or {} if example else {}, 

755 "reference_outputs": example.outputs or {} if example else {}, 

756 } 

757 kwargs = {} 

758 args = [] 

759 traced_inputs = {} 

760 for param_name, param in sig.parameters.items(): 

761 # Could have params with defaults that are not in the arg map 

762 if param_name in arg_map: 

763 if param.kind in ( 

764 param.POSITIONAL_OR_KEYWORD, 

765 param.POSITIONAL_ONLY, 

766 ): 

767 args.append(arg_map[param_name]) 

768 else: 

769 kwargs[param_name] = arg_map[param_name] 

770 traced_inputs[param_name] = ( 

771 _maxsize_repr(arg_map[param_name]) 

772 if param_name in ("run", "example") 

773 else arg_map[param_name] 

774 ) 

775 return args, kwargs, traced_inputs 

776 

777 def wrapper(run: Run, example: Optional[Example]) -> _RUNNABLE_OUTPUT: 

778 (args, kwargs, _) = _prepare_inputs(run, example) 

779 return func(*args, **kwargs) 

780 

781 wrapper.__name__ = ( 

782 getattr(func, "__name__") 

783 if hasattr(func, "__name__") 

784 else wrapper.__name__ 

785 ) 

786 return (wrapper, _prepare_inputs) # type: ignore[return-value] 

787 

788 

789def _normalize_comparison_evaluator_func( 

790 func: Callable, 

791) -> tuple[ 

792 Union[ 

793 Callable[[Sequence[Run], Optional[Example]], _COMPARISON_OUTPUT], 

794 Callable[[Sequence[Run], Optional[Example]], Awaitable[_COMPARISON_OUTPUT]], 

795 ], 

796 Optional[Callable[..., dict]], 

797]: 

798 supported_args = ("runs", "example", "inputs", "outputs", "reference_outputs") 

799 sig = inspect.signature(func) 

800 all_args = [pname for pname, p in sig.parameters.items() if p.kind != p.VAR_KEYWORD] 

801 args_with_defaults = [ 

802 pname 

803 for pname, p in sig.parameters.items() 

804 if p.default is not inspect.Parameter.empty 

805 ] 

806 if not all_args or ( 

807 not all( 

808 pname in supported_args or pname in args_with_defaults for pname in all_args 

809 ) 

810 and len([a for a in all_args if a not in args_with_defaults]) != 2 

811 ): 

812 msg = ( 

813 f"Invalid evaluator function. Must have at least one " 

814 f"argument. Supported arguments are {supported_args}. Please " 

815 f"see https://docs.smith.langchain.com/evaluation/how_to_guides/evaluation/evaluate_llm_application#use-custom-evaluators" 

816 # noqa: E501 

817 ) 

818 raise ValueError(msg) 

819 # For backwards compatibility we assume custom arg names are List[Run] and 

820 # List[Example] types, respectively. 

821 elif not all( 

822 pname in supported_args or pname in args_with_defaults for pname in all_args 

823 ) or all_args == [ 

824 "runs", 

825 "example", 

826 ]: 

827 return func, None 

828 else: 

829 if inspect.iscoroutinefunction(func): 

830 

831 def _prepare_inputs( 

832 runs: Sequence[Run], example: Optional[Example] 

833 ) -> tuple[list, dict, dict]: 

834 arg_map = { 

835 "runs": runs, 

836 "example": example, 

837 "inputs": example.inputs if example else {}, 

838 "outputs": [run.outputs or {} for run in runs], 

839 "reference_outputs": example.outputs or {} if example else {}, 

840 } 

841 kwargs = {} 

842 args = [] 

843 traced_inputs = {} 

844 for param_name, param in sig.parameters.items(): 

845 # Could have params with defaults that are not in the arg map 

846 if param_name in arg_map: 

847 if param.kind in ( 

848 param.POSITIONAL_OR_KEYWORD, 

849 param.POSITIONAL_ONLY, 

850 ): 

851 args.append(arg_map[param_name]) 

852 else: 

853 kwargs[param_name] = arg_map[param_name] 

854 traced_inputs[param_name] = ( 

855 _maxsize_repr(arg_map[param_name]) 

856 if param_name in ("runs", "example") 

857 else arg_map[param_name] 

858 ) 

859 return args, kwargs, traced_inputs 

860 

861 async def awrapper( 

862 runs: Sequence[Run], example: Optional[Example] 

863 ) -> _COMPARISON_OUTPUT: 

864 (args, kwargs, _) = _prepare_inputs(runs, example) 

865 return await func(*args, **kwargs) 

866 

867 awrapper.__name__ = ( 

868 getattr(func, "__name__") 

869 if hasattr(func, "__name__") 

870 else awrapper.__name__ 

871 ) 

872 return awrapper, _prepare_inputs # type: ignore[return-value] 

873 

874 else: 

875 

876 def _prepare_inputs( 

877 runs: Sequence[Run], example: Optional[Example] 

878 ) -> tuple[list, dict, dict]: 

879 arg_map = { 

880 "runs": runs, 

881 "example": example, 

882 "inputs": example.inputs if example else {}, 

883 "outputs": [run.outputs or {} for run in runs], 

884 "reference_outputs": example.outputs or {} if example else {}, 

885 } 

886 kwargs = {} 

887 args = [] 

888 traced_inputs = {} 

889 for param_name, param in sig.parameters.items(): 

890 # Could have params with defaults that are not in the arg map 

891 if param_name in arg_map: 

892 if param.kind in ( 

893 param.POSITIONAL_OR_KEYWORD, 

894 param.POSITIONAL_ONLY, 

895 ): 

896 args.append(arg_map[param_name]) 

897 else: 

898 kwargs[param_name] = arg_map[param_name] 

899 traced_inputs[param_name] = ( 

900 _maxsize_repr(arg_map[param_name]) 

901 if param_name in ("runs", "example") 

902 else arg_map[param_name] 

903 ) 

904 return args, kwargs, traced_inputs 

905 

906 def wrapper( 

907 runs: Sequence[Run], example: Optional[Example] 

908 ) -> _COMPARISON_OUTPUT: 

909 (args, kwargs, _) = _prepare_inputs(runs, example) 

910 return func(*args, **kwargs) 

911 

912 wrapper.__name__ = ( 

913 getattr(func, "__name__") 

914 if hasattr(func, "__name__") 

915 else wrapper.__name__ 

916 ) 

917 return wrapper, _prepare_inputs # type: ignore[return-value] 

918 

919 

920def _format_evaluator_result( 

921 result: Union[EvaluationResults, dict, str, int, bool, float, list], 

922) -> Union[EvaluationResults, dict]: 

923 if isinstance(result, (bool, float, int)): 

924 result = {"score": result} 

925 elif not result: 

926 raise ValueError( 

927 f"Expected a non-empty dict, str, bool, int, float, list, " 

928 f"EvaluationResult, or EvaluationResults. Got {result}" 

929 ) 

930 elif isinstance(result, list): 

931 if not all(isinstance(x, dict) for x in result): 

932 raise ValueError( 

933 f"Expected a list of dicts or EvaluationResults. Received {result}." 

934 ) 

935 result = {"results": result} # type: ignore[misc] 

936 elif isinstance(result, str): 

937 result = {"value": result} 

938 elif isinstance(result, dict): 

939 pass 

940 else: 

941 raise ValueError( 

942 f"Expected a dict, str, bool, int, float, list, EvaluationResult, or " 

943 f"EvaluationResults. Got {result}" 

944 ) 

945 return result 

946 

947 

948SUMMARY_EVALUATOR_T = Union[ 

949 Callable[ 

950 [Sequence[schemas.Run], Sequence[schemas.Example]], 

951 Union[EvaluationResult, EvaluationResults], 

952 ], 

953 Callable[ 

954 [list[schemas.Run], list[schemas.Example]], 

955 Union[EvaluationResult, EvaluationResults], 

956 ], 

957] 

958 

959 

960def _normalize_summary_evaluator(func: Callable) -> SUMMARY_EVALUATOR_T: 

961 supported_args = ("runs", "examples", "inputs", "outputs", "reference_outputs") 

962 sig = inspect.signature(func) 

963 all_args = [pname for pname, p in sig.parameters.items()] 

964 args_with_defaults = [ 

965 pname 

966 for pname, p in sig.parameters.items() 

967 if p.default is not inspect.Parameter.empty 

968 ] 

969 if not all_args or ( 

970 not all( 

971 pname in supported_args or pname in args_with_defaults for pname in all_args 

972 ) 

973 and len([a for a in all_args if a not in args_with_defaults]) != 2 

974 ): 

975 msg = ( 

976 f"Invalid evaluator function. Must have at least one " 

977 f"argument. Supported arguments are {supported_args}." 

978 ) 

979 if all_args: 

980 msg += f" Received arguments {all_args}." 

981 raise ValueError(msg) 

982 # For backwards compatibility we assume custom arg names are Sequence[Run] and 

983 # Sequence[Example] types, respectively. 

984 elif not all(pname in supported_args for pname in all_args) or all_args == [ 

985 "runs", 

986 "examples", 

987 ]: 

988 return func 

989 else: 

990 

991 def wrapper( 

992 runs: Sequence[schemas.Run], examples: Sequence[schemas.Example] 

993 ) -> Union[EvaluationResult, EvaluationResults]: 

994 arg_map = { 

995 "runs": runs, 

996 "examples": examples, 

997 "inputs": [example.inputs for example in examples], 

998 "outputs": [run.outputs or {} for run in runs], 

999 "reference_outputs": [example.outputs or {} for example in examples], 

1000 } 

1001 kwargs = {} 

1002 args = [] 

1003 for param_name, param in sig.parameters.items(): 

1004 # Could have params with defaults that are not in the arg map 

1005 if param_name in arg_map: 

1006 if param.kind in ( 

1007 param.POSITIONAL_OR_KEYWORD, 

1008 param.POSITIONAL_ONLY, 

1009 ): 

1010 args.append(arg_map[param_name]) 

1011 else: 

1012 kwargs[param_name] = arg_map[param_name] 

1013 

1014 result = func(*args, **kwargs) 

1015 if isinstance(result, EvaluationResult): 

1016 return result 

1017 return _format_evaluator_result(result) # type: ignore 

1018 

1019 wrapper.__name__ = ( 

1020 getattr(func, "__name__") if hasattr(func, "__name__") else wrapper.__name__ 

1021 ) 

1022 return wrapper # type: ignore[return-value]