Coverage for langsmith/testing/_internal.py: 4%
479 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1from __future__ import annotations
3import atexit
4import contextlib
5import contextvars
6import datetime
7import functools
8import hashlib
9import importlib
10import inspect
11import logging
12import os
13import threading
14import time
15import uuid
16import warnings
17from collections.abc import Generator, Sequence
18from concurrent.futures import Future
19from pathlib import Path
20from typing import (
21 Any,
22 Callable,
23 Optional,
24 TypeVar,
25 Union,
26 cast,
27 overload,
28)
30from typing_extensions import TypedDict
32from langsmith import client as ls_client
33from langsmith import env as ls_env
34from langsmith import run_helpers as rh
35from langsmith import run_trees
36from langsmith import run_trees as rt
37from langsmith import schemas as ls_schemas
38from langsmith import utils as ls_utils
39from langsmith._internal import _orjson
40from langsmith._internal._serde import dumps_json
41from langsmith.client import ID_TYPE
43try:
44 import pytest # type: ignore
46 SkipException = pytest.skip.Exception
47except ImportError:
49 class SkipException(Exception): # type: ignore[no-redef]
50 pass
53logger = logging.getLogger(__name__)
55# UUID5 namespace used for generating consistent example IDs
56UUID5_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")
58T = TypeVar("T")
59U = TypeVar("U")
62def _object_hash(obj: Any) -> str:
63 """Hash an object to generate a consistent hash string."""
64 # Use the existing serialization infrastructure with consistent ordering
65 serialized = _stringify(obj)
66 return hashlib.sha256(serialized.encode()).hexdigest()
69@overload
70def test(
71 func: Callable,
72) -> Callable: ...
75@overload
76def test(
77 *,
78 id: Optional[uuid.UUID] = None,
79 output_keys: Optional[Sequence[str]] = None,
80 client: Optional[ls_client.Client] = None,
81 test_suite_name: Optional[str] = None,
82 metadata: Optional[dict] = None,
83 repetitions: Optional[int] = None,
84 split: Optional[Union[str | list[str]]] = None,
85 cached_hosts: Optional[Sequence[str]] = None,
86) -> Callable[[Callable], Callable]: ...
89def test(*args: Any, **kwargs: Any) -> Callable:
90 """Trace a pytest test case in LangSmith.
92 This decorator is used to trace a pytest test to LangSmith. It ensures
93 that the necessary example data is created and associated with the test function.
94 The decorated function will be executed as a test case, and the results will be
95 recorded and reported by LangSmith.
97 Args:
98 - id (Optional[uuid.UUID]): A unique identifier for the test case. If not
99 provided, an ID will be generated based on the test function's module
100 and name.
101 - output_keys (Optional[Sequence[str]]): A list of keys to be considered as
102 the output keys for the test case. These keys will be extracted from the
103 test function's inputs and stored as the expected outputs.
104 - client (Optional[ls_client.Client]): An instance of the LangSmith client
105 to be used for communication with the LangSmith service. If not provided,
106 a default client will be used.
107 - test_suite_name (Optional[str]): The name of the test suite to which the
108 test case belongs. If not provided, the test suite name will be determined
109 based on the environment or the package name.
110 - cached_hosts (Optional[Sequence[str]]): A list of hosts or URL prefixes to
111 cache requests to during testing. If not provided, all requests will be
112 cached (default behavior). This is useful for caching only specific
113 API calls (e.g., ["api.openai.com"] or ["https://api.openai.com"]).
115 Returns:
116 Callable: The decorated test function.
118 Environment:
119 - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to
120 save time and costs during testing. Recommended to commit the
121 cache files to your repository for faster CI/CD runs.
122 Requires the 'langsmith[vcr]' package to be installed.
123 - `LANGSMITH_TEST_TRACKING`: Set this variable to the path of a directory
124 to enable caching of test results. This is useful for re-running tests
125 without re-executing the code. Requires the 'langsmith[vcr]' package.
127 Example:
128 For basic usage, simply decorate a test function with `@pytest.mark.langsmith`.
129 Under the hood this will call the `test` method:
131 ```python
132 import pytest
135 # Equivalently can decorate with `test` directly:
136 # from langsmith import test
137 # @test
138 @pytest.mark.langsmith
139 def test_addition():
140 assert 3 + 4 == 7
141 ```
144 Any code that is traced (such as those traced using `@traceable`
145 or `wrap_*` functions) will be traced within the test case for
146 improved visibility and debugging.
148 ```python
149 import pytest
150 from langsmith import traceable
153 @traceable
154 def generate_numbers():
155 return 3, 4
158 @pytest.mark.langsmith
159 def test_nested():
160 # Traced code will be included in the test case
161 a, b = generate_numbers()
162 assert a + b == 7
163 ```
165 LLM calls are expensive! Cache requests by setting
166 `LANGSMITH_TEST_CACHE=path/to/cache`. Check in these files to speed up
167 CI/CD pipelines, so your results only change when your prompt or requested
168 model changes.
170 Note that this will require that you install langsmith with the `vcr` extra:
172 `pip install -U "langsmith[vcr]"`
174 Caching is faster if you install libyaml. See
175 https://vcrpy.readthedocs.io/en/latest/installation.html#speed for more details.
177 ```python
178 # os.environ["LANGSMITH_TEST_CACHE"] = "tests/cassettes"
179 import openai
180 import pytest
181 from langsmith import wrappers
183 oai_client = wrappers.wrap_openai(openai.Client())
186 @pytest.mark.langsmith
187 def test_openai_says_hello():
188 # Traced code will be included in the test case
189 response = oai_client.chat.completions.create(
190 model="gpt-3.5-turbo",
191 messages=[
192 {"role": "system", "content": "You are a helpful assistant."},
193 {"role": "user", "content": "Say hello!"},
194 ],
195 )
196 assert "hello" in response.choices[0].message.content.lower()
197 ```
199 You can also specify which hosts to cache by using the `cached_hosts` parameter.
200 This is useful when you only want to cache specific API calls:
202 ```python
203 @pytest.mark.langsmith(cached_hosts=["https://api.openai.com"])
204 def test_openai_with_selective_caching():
205 # Only OpenAI API calls will be cached, other API calls will not
206 # be cached
207 response = oai_client.chat.completions.create(
208 model="gpt-3.5-turbo",
209 messages=[
210 {"role": "system", "content": "You are a helpful assistant."},
211 {"role": "user", "content": "Say hello!"},
212 ],
213 )
214 assert "hello" in response.choices[0].message.content.lower()
215 ```
217 LLMs are stochastic. Naive assertions are flakey. You can use langsmith's
218 `expect` to score and make approximate assertions on your results.
220 ```python
221 import pytest
222 from langsmith import expect
225 @pytest.mark.langsmith
226 def test_output_semantically_close():
227 response = oai_client.chat.completions.create(
228 model="gpt-3.5-turbo",
229 messages=[
230 {"role": "system", "content": "You are a helpful assistant."},
231 {"role": "user", "content": "Say hello!"},
232 ],
233 )
234 # The embedding_distance call logs the embedding distance to LangSmith
235 expect.embedding_distance(
236 prediction=response.choices[0].message.content,
237 reference="Hello!",
238 # The following optional assertion logs a
239 # pass/fail score to LangSmith
240 # and raises an AssertionError if the assertion fails.
241 ).to_be_less_than(1.0)
242 # Compute damerau_levenshtein distance
243 expect.edit_distance(
244 prediction=response.choices[0].message.content,
245 reference="Hello!",
246 # And then log a pass/fail score to LangSmith
247 ).to_be_less_than(1.0)
248 ```
250 The `@test` decorator works natively with pytest fixtures.
251 The values will populate the "inputs" of the corresponding example in LangSmith.
253 ```python
254 import pytest
257 @pytest.fixture
258 def some_input():
259 return "Some input"
262 @pytest.mark.langsmith
263 def test_with_fixture(some_input: str):
264 assert "input" in some_input
265 ```
267 You can still use `pytest.parametrize()` as usual to run multiple test cases
268 using the same test function.
270 ```python
271 import pytest
274 @pytest.mark.langsmith(output_keys=["expected"])
275 @pytest.mark.parametrize(
276 "a, b, expected",
277 [
278 (1, 2, 3),
279 (3, 4, 7),
280 ],
281 )
282 def test_addition_with_multiple_inputs(a: int, b: int, expected: int):
283 assert a + b == expected
284 ```
286 By default, each test case will be assigned a consistent, unique identifier
287 based on the function name and module. You can also provide a custom identifier
288 using the `id` argument:
290 ```python
291 import pytest
292 import uuid
294 example_id = uuid.uuid4()
297 @pytest.mark.langsmith(id=str(example_id))
298 def test_multiplication():
299 assert 3 * 4 == 12
300 ```
302 By default, all test inputs are saved as "inputs" to a dataset.
303 You can specify the `output_keys` argument to persist those keys
304 within the dataset's "outputs" fields.
306 ```python
307 import pytest
310 @pytest.fixture
311 def expected_output():
312 return "input"
315 @pytest.mark.langsmith(output_keys=["expected_output"])
316 def test_with_expected_output(some_input: str, expected_output: str):
317 assert expected_output in some_input
318 ```
321 To run these tests, use the pytest CLI. Or directly run the test functions.
323 ```python
324 test_output_semantically_close()
325 test_addition()
326 test_nested()
327 test_with_fixture("Some input")
328 test_with_expected_output("Some input", "Some")
329 test_multiplication()
330 test_openai_says_hello()
331 test_addition_with_multiple_inputs(1, 2, 3)
332 ```
333 """
334 cached_hosts = kwargs.pop("cached_hosts", None)
335 cache_dir = ls_utils.get_cache_dir(kwargs.pop("cache", None))
337 # Validate cached_hosts usage
338 if cached_hosts and not cache_dir:
339 raise ValueError(
340 "cached_hosts parameter requires caching to be enabled. "
341 "Please set the LANGSMITH_TEST_CACHE environment variable "
342 "to a cache directory path, "
343 "or pass a cache parameter to the test decorator. "
344 "Example: LANGSMITH_TEST_CACHE='tests/cassettes' "
345 "or @pytest.mark.langsmith(cache='tests/cassettes', cached_hosts=[...])"
346 )
348 langtest_extra = _UTExtra(
349 id=kwargs.pop("id", None),
350 output_keys=kwargs.pop("output_keys", None),
351 client=kwargs.pop("client", None),
352 test_suite_name=kwargs.pop("test_suite_name", None),
353 cache=cache_dir,
354 metadata=kwargs.pop("metadata", None),
355 repetitions=kwargs.pop("repetitions", None),
356 split=kwargs.pop("split", None),
357 cached_hosts=cached_hosts,
358 )
359 if kwargs:
360 warnings.warn(f"Unexpected keyword arguments: {kwargs.keys()}")
361 disable_tracking = ls_utils.test_tracking_is_disabled()
362 if disable_tracking:
363 logger.info(
364 "LANGSMITH_TEST_TRACKING is set to 'false'."
365 " Skipping LangSmith test tracking."
366 )
368 def decorator(func: Callable) -> Callable:
369 # Handle repetitions
370 repetitions = langtest_extra.get("repetitions", 1) or 1
372 if inspect.iscoroutinefunction(func):
374 @functools.wraps(func)
375 async def async_wrapper(
376 *test_args: Any, request: Any = None, **test_kwargs: Any
377 ):
378 if disable_tracking:
379 return await func(*test_args, **test_kwargs)
381 # Run test multiple times for repetitions
382 for i in range(repetitions):
383 repetition_extra = langtest_extra.copy()
384 await _arun_test(
385 func,
386 *test_args,
387 pytest_request=request,
388 **test_kwargs,
389 langtest_extra=repetition_extra,
390 )
392 return async_wrapper
394 @functools.wraps(func)
395 def wrapper(*test_args: Any, request: Any = None, **test_kwargs: Any):
396 if disable_tracking:
397 return func(*test_args, **test_kwargs)
399 # Run test multiple times for repetitions
400 for i in range(repetitions):
401 repetition_extra = langtest_extra.copy()
402 _run_test(
403 func,
404 *test_args,
405 pytest_request=request,
406 **test_kwargs,
407 langtest_extra=repetition_extra,
408 )
410 return wrapper
412 if args and callable(args[0]):
413 return decorator(args[0])
415 return decorator
418## Private functions
421def _get_experiment_name(test_suite_name: str) -> str:
422 # If this is a pytest-xdist multi-process run then we need to create the same
423 # experiment name across processes. We can do this by accessing the
424 # PYTEST_XDIST_TESTRUNID env var.
425 if os.environ.get("PYTEST_XDIST_TESTRUNUID") and importlib.util.find_spec("xdist"):
426 id_name = test_suite_name + os.environ["PYTEST_XDIST_TESTRUNUID"]
427 id_ = str(uuid.uuid5(uuid.NAMESPACE_DNS, id_name).hex[:8])
428 else:
429 id_ = str(uuid.uuid4().hex[:8])
431 if os.environ.get("LANGSMITH_EXPERIMENT"):
432 prefix = os.environ["LANGSMITH_EXPERIMENT"]
433 else:
434 prefix = ls_utils.get_tracer_project(False) or "TestSuiteResult"
435 name = f"{prefix}:{id_}"
436 return name
439def _get_test_suite_name(func: Callable) -> str:
440 test_suite_name = ls_utils.get_env_var("TEST_SUITE")
441 if test_suite_name:
442 return test_suite_name
443 repo_name = ls_env.get_git_info()["repo_name"]
444 try:
445 mod = inspect.getmodule(func)
446 if mod:
447 return f"{repo_name}.{mod.__name__}"
448 except BaseException:
449 logger.debug("Could not determine test suite name from file path.")
451 raise ValueError("Please set the LANGSMITH_TEST_SUITE environment variable.")
454def _get_test_suite(
455 client: ls_client.Client, test_suite_name: str
456) -> ls_schemas.Dataset:
457 if client.has_dataset(dataset_name=test_suite_name):
458 return client.read_dataset(dataset_name=test_suite_name)
459 else:
460 repo = ls_env.get_git_info().get("remote_url") or ""
461 description = "Test suite"
462 if repo:
463 description += f" for {repo}"
464 try:
465 return client.create_dataset(
466 dataset_name=test_suite_name,
467 description=description,
468 metadata={"__ls_runner": "pytest"},
469 )
470 except ls_utils.LangSmithConflictError:
471 return client.read_dataset(dataset_name=test_suite_name)
474def _start_experiment(
475 client: ls_client.Client,
476 test_suite: ls_schemas.Dataset,
477) -> ls_schemas.TracerSession:
478 experiment_name = _get_experiment_name(test_suite.name)
479 try:
480 return client.create_project(
481 experiment_name,
482 reference_dataset_id=test_suite.id,
483 description="Test Suite Results.",
484 metadata={
485 "revision_id": ls_env.get_langchain_env_var_metadata().get(
486 "revision_id"
487 ),
488 "__ls_runner": "pytest",
489 },
490 )
491 except ls_utils.LangSmithConflictError:
492 return client.read_project(project_name=experiment_name)
495def _get_example_id(
496 dataset_id: str,
497 inputs: dict,
498 outputs: Optional[dict] = None,
499) -> uuid.UUID:
500 """Generate example ID based on inputs, outputs, and dataset ID."""
501 identifier_obj = (dataset_id, _object_hash(inputs), _object_hash(outputs or {}))
502 identifier = _stringify(identifier_obj)
503 return uuid.uuid5(UUID5_NAMESPACE, identifier)
506def _get_example_id_legacy(
507 func: Callable, inputs: Optional[dict], suite_id: uuid.UUID
508) -> tuple[uuid.UUID, str]:
509 try:
510 file_path = str(Path(inspect.getfile(func)).relative_to(Path.cwd()))
511 except ValueError:
512 # Fall back to module name if file path is not available
513 file_path = func.__module__
514 identifier = f"{suite_id}{file_path}::{func.__name__}"
515 # If parametrized test, need to add inputs to identifier:
516 if hasattr(func, "pytestmark") and any(
517 m.name == "parametrize" for m in func.pytestmark
518 ):
519 identifier += _stringify(inputs)
520 return uuid.uuid5(uuid.NAMESPACE_DNS, identifier), identifier[len(str(suite_id)) :]
523def _end_tests(test_suite: _LangSmithTestSuite):
524 git_info = ls_env.get_git_info() or {}
525 test_suite.shutdown()
526 dataset_version = test_suite.get_dataset_version()
527 dataset_id = test_suite._dataset.id
528 test_suite.client.update_project(
529 test_suite.experiment_id,
530 metadata={
531 **git_info,
532 "dataset_version": dataset_version,
533 "revision_id": ls_env.get_langchain_env_var_metadata().get("revision_id"),
534 "__ls_runner": "pytest",
535 },
536 )
537 if dataset_version and git_info["commit"] is not None:
538 test_suite.client.update_dataset_tag(
539 dataset_id=dataset_id,
540 as_of=dataset_version,
541 tag=f"git:commit:{git_info['commit']}",
542 )
543 if dataset_version and git_info["branch"] is not None:
544 test_suite.client.update_dataset_tag(
545 dataset_id=dataset_id,
546 as_of=dataset_version,
547 tag=f"git:branch:{git_info['branch']}",
548 )
551VT = TypeVar("VT", bound=Optional[dict])
554def _serde_example_values(values: VT) -> VT:
555 if values is None:
556 return cast(VT, values)
557 bts = ls_client._dumps_json(values)
558 return _orjson.loads(bts)
561class _LangSmithTestSuite:
562 _instances: Optional[dict] = None
563 _lock = threading.RLock()
565 def __init__(
566 self,
567 client: Optional[ls_client.Client],
568 experiment: ls_schemas.TracerSession,
569 dataset: ls_schemas.Dataset,
570 ):
571 self.client = client or rt.get_cached_client()
572 self._experiment = experiment
573 self._dataset = dataset
574 self._dataset_version: Optional[datetime.datetime] = dataset.modified_at
575 self._executor = ls_utils.ContextThreadPoolExecutor()
576 atexit.register(_end_tests, self)
578 @property
579 def id(self):
580 return self._dataset.id
582 @property
583 def experiment_id(self):
584 return self._experiment.id
586 @property
587 def experiment(self):
588 return self._experiment
590 @classmethod
591 def from_test(
592 cls,
593 client: Optional[ls_client.Client],
594 func: Callable,
595 test_suite_name: Optional[str] = None,
596 ) -> _LangSmithTestSuite:
597 client = client or rt.get_cached_client()
598 test_suite_name = test_suite_name or _get_test_suite_name(func)
599 with cls._lock:
600 if not cls._instances:
601 cls._instances = {}
602 if test_suite_name not in cls._instances:
603 test_suite = _get_test_suite(client, test_suite_name)
604 experiment = _start_experiment(client, test_suite)
605 cls._instances[test_suite_name] = cls(client, experiment, test_suite)
606 return cls._instances[test_suite_name]
608 @property
609 def name(self):
610 return self._experiment.name
612 def get_dataset_version(self):
613 return self._dataset_version
615 def submit_result(
616 self,
617 run_id: uuid.UUID,
618 error: Optional[str] = None,
619 skipped: bool = False,
620 pytest_plugin: Any = None,
621 pytest_nodeid: Any = None,
622 ) -> None:
623 if skipped:
624 score = None
625 status = "skipped"
626 elif error:
627 score = 0
628 status = "failed"
629 else:
630 score = 1
631 status = "passed"
632 if pytest_plugin and pytest_nodeid:
633 pytest_plugin.update_process_status(pytest_nodeid, {"status": status})
634 self._executor.submit(self._submit_result, run_id, score)
636 def _submit_result(self, run_id: uuid.UUID, score: Optional[int]) -> None:
637 # trace_id will always be run_id here because the feedback is on the root
638 # test run
639 self.client.create_feedback(run_id, key="pass", score=score, trace_id=run_id)
641 def sync_example(
642 self,
643 example_id: uuid.UUID,
644 *,
645 inputs: Optional[dict] = None,
646 outputs: Optional[dict] = None,
647 metadata: Optional[dict] = None,
648 split: Optional[Union[str, list[str]]] = None,
649 pytest_plugin=None,
650 pytest_nodeid=None,
651 ) -> None:
652 inputs = inputs or {}
653 if pytest_plugin and pytest_nodeid:
654 update = {"inputs": inputs, "reference_outputs": outputs}
655 update = {k: v for k, v in update.items() if v is not None}
656 pytest_plugin.update_process_status(pytest_nodeid, update)
657 metadata = metadata.copy() if metadata else metadata
658 inputs = _serde_example_values(inputs)
659 outputs = _serde_example_values(outputs)
660 try:
661 example = self.client.read_example(example_id=example_id)
662 except ls_utils.LangSmithNotFoundError:
663 example = self.client.create_example(
664 example_id=example_id,
665 inputs=inputs,
666 outputs=outputs,
667 dataset_id=self.id,
668 metadata=metadata,
669 split=split,
670 created_at=self._experiment.start_time,
671 )
672 else:
673 normalized_split = split
674 if isinstance(normalized_split, str):
675 normalized_split = [normalized_split]
676 if normalized_split and metadata:
677 metadata["dataset_split"] = normalized_split
678 existing_dataset_split = (example.metadata or {}).pop("dataset_split")
679 if (
680 (inputs != example.inputs)
681 or (outputs is not None and outputs != example.outputs)
682 or (metadata is not None and metadata != example.metadata)
683 or str(example.dataset_id) != str(self.id)
684 or (
685 normalized_split is not None
686 and existing_dataset_split != normalized_split
687 )
688 ):
689 self.client.update_example(
690 example_id=example.id,
691 inputs=inputs,
692 outputs=outputs,
693 metadata=metadata,
694 split=split,
695 dataset_id=self.id,
696 )
697 example = self.client.read_example(example_id=example.id)
698 if self._dataset_version is None:
699 self._dataset_version = example.modified_at
700 elif (
701 example.modified_at
702 and self._dataset_version
703 and example.modified_at > self._dataset_version
704 ):
705 self._dataset_version = example.modified_at
707 def _submit_feedback(
708 self,
709 run_id: ID_TYPE,
710 feedback: Union[dict, list],
711 pytest_plugin: Any = None,
712 pytest_nodeid: Any = None,
713 **kwargs: Any,
714 ):
715 feedback = feedback if isinstance(feedback, list) else [feedback]
716 for fb in feedback:
717 if pytest_plugin and pytest_nodeid:
718 val = fb["score"] if "score" in fb else fb["value"]
719 pytest_plugin.update_process_status(
720 pytest_nodeid, {"feedback": {fb["key"]: val}}
721 )
722 self._executor.submit(
723 self._create_feedback, run_id=run_id, feedback=fb, **kwargs
724 )
726 def _create_feedback(self, run_id: ID_TYPE, feedback: dict, **kwargs: Any) -> None:
727 # trace_id will always be run_id here because the feedback is on the root
728 # test run
729 self.client.create_feedback(run_id, **feedback, **kwargs, trace_id=run_id)
731 def shutdown(self):
732 self._executor.shutdown()
734 def end_run(
735 self,
736 run_tree,
737 example_id,
738 outputs,
739 reference_outputs,
740 metadata,
741 split,
742 pytest_plugin=None,
743 pytest_nodeid=None,
744 ) -> Future:
745 return self._executor.submit(
746 self._end_run,
747 run_tree=run_tree,
748 example_id=example_id,
749 outputs=outputs,
750 reference_outputs=reference_outputs,
751 metadata=metadata,
752 split=split,
753 pytest_plugin=pytest_plugin,
754 pytest_nodeid=pytest_nodeid,
755 )
757 def _end_run(
758 self,
759 run_tree,
760 example_id,
761 outputs,
762 reference_outputs,
763 metadata,
764 split,
765 pytest_plugin,
766 pytest_nodeid,
767 ) -> None:
768 # TODO: remove this hack so that run durations are correct
769 # Ensure example is fully updated
770 self.sync_example(
771 example_id,
772 inputs=run_tree.inputs,
773 outputs=reference_outputs,
774 split=split,
775 metadata=metadata,
776 )
777 run_tree.reference_example_id = example_id
778 run_tree.end(outputs=outputs, metadata={"reference_example_id": example_id})
779 run_tree.patch()
782class _TestCase:
783 def __init__(
784 self,
785 test_suite: _LangSmithTestSuite,
786 run_id: uuid.UUID,
787 example_id: Optional[uuid.UUID] = None,
788 metadata: Optional[dict] = None,
789 split: Optional[Union[str, list[str]]] = None,
790 pytest_plugin: Any = None,
791 pytest_nodeid: Any = None,
792 inputs: Optional[dict] = None,
793 reference_outputs: Optional[dict] = None,
794 ) -> None:
795 self.test_suite = test_suite
796 self.example_id = example_id
797 self.run_id = run_id
798 self.metadata = metadata
799 self.split = split
800 self.pytest_plugin = pytest_plugin
801 self.pytest_nodeid = pytest_nodeid
802 self.inputs = inputs
803 self.reference_outputs = reference_outputs
804 self._logged_reference_outputs: Optional[dict] = None
805 self._logged_outputs: Optional[dict] = None
807 if pytest_plugin and pytest_nodeid:
808 pytest_plugin.add_process_to_test_suite(
809 test_suite._dataset.name, pytest_nodeid
810 )
811 if inputs:
812 self.log_inputs(inputs)
813 if reference_outputs:
814 self.log_reference_outputs(reference_outputs)
816 def submit_feedback(self, *args, **kwargs: Any):
817 self.test_suite._submit_feedback(
818 *args,
819 **{
820 **kwargs,
821 **dict(
822 pytest_plugin=self.pytest_plugin,
823 pytest_nodeid=self.pytest_nodeid,
824 ),
825 },
826 )
828 def log_inputs(self, inputs: dict) -> None:
829 if self.pytest_plugin and self.pytest_nodeid:
830 self.pytest_plugin.update_process_status(
831 self.pytest_nodeid, {"inputs": inputs}
832 )
834 def log_outputs(self, outputs: dict) -> None:
835 self._logged_outputs = outputs
836 if self.pytest_plugin and self.pytest_nodeid:
837 self.pytest_plugin.update_process_status(
838 self.pytest_nodeid, {"outputs": outputs}
839 )
841 def log_reference_outputs(self, reference_outputs: dict) -> None:
842 self._logged_reference_outputs = reference_outputs
843 if self.pytest_plugin and self.pytest_nodeid:
844 self.pytest_plugin.update_process_status(
845 self.pytest_nodeid, {"reference_outputs": reference_outputs}
846 )
848 def submit_test_result(
849 self,
850 error: Optional[str] = None,
851 skipped: bool = False,
852 ) -> None:
853 return self.test_suite.submit_result(
854 self.run_id,
855 error=error,
856 skipped=skipped,
857 pytest_plugin=self.pytest_plugin,
858 pytest_nodeid=self.pytest_nodeid,
859 )
861 def start_time(self) -> None:
862 if self.pytest_plugin and self.pytest_nodeid:
863 self.pytest_plugin.update_process_status(
864 self.pytest_nodeid, {"start_time": time.time()}
865 )
867 def end_time(self) -> None:
868 if self.pytest_plugin and self.pytest_nodeid:
869 self.pytest_plugin.update_process_status(
870 self.pytest_nodeid, {"end_time": time.time()}
871 )
873 def end_run(self, run_tree, outputs: Any) -> None:
874 if not (outputs is None or isinstance(outputs, dict)):
875 outputs = {"output": outputs}
876 example_id = self.example_id or _get_example_id(
877 dataset_id=str(self.test_suite.id),
878 inputs=self.inputs or {},
879 outputs=outputs,
880 )
881 self.test_suite.end_run(
882 run_tree,
883 example_id,
884 outputs,
885 reference_outputs=self._logged_reference_outputs,
886 metadata=self.metadata,
887 split=self.split,
888 pytest_plugin=self.pytest_plugin,
889 pytest_nodeid=self.pytest_nodeid,
890 )
893_TEST_CASE = contextvars.ContextVar[Optional[_TestCase]]("_TEST_CASE", default=None)
896class _UTExtra(TypedDict, total=False):
897 client: Optional[ls_client.Client]
898 id: Optional[uuid.UUID]
899 output_keys: Optional[Sequence[str]]
900 test_suite_name: Optional[str]
901 cache: Optional[str]
902 metadata: Optional[dict]
903 repetitions: Optional[int]
904 split: Optional[Union[str, list[str]]]
905 cached_hosts: Optional[Sequence[str]]
908def _create_test_case(
909 func: Callable,
910 *args: Any,
911 pytest_request: Any,
912 langtest_extra: _UTExtra,
913 **kwargs: Any,
914) -> _TestCase:
915 client = langtest_extra["client"] or rt.get_cached_client()
916 output_keys = langtest_extra["output_keys"]
917 metadata = langtest_extra["metadata"]
918 split = langtest_extra["split"]
919 signature = inspect.signature(func)
920 inputs = rh._get_inputs_safe(signature, *args, **kwargs) or None
921 outputs = None
922 if output_keys:
923 outputs = {}
924 if not inputs:
925 msg = (
926 "'output_keys' should only be specified when marked test function has "
927 "input arguments."
928 )
929 raise ValueError(msg)
930 for k in output_keys:
931 outputs[k] = inputs.pop(k, None)
932 test_suite = _LangSmithTestSuite.from_test(
933 client, func, langtest_extra.get("test_suite_name")
934 )
935 example_id = langtest_extra["id"]
936 dataset_sdk_version = (
937 test_suite._dataset.metadata
938 and test_suite._dataset.metadata.get("runtime")
939 and test_suite._dataset.metadata.get("runtime", {}).get("sdk_version")
940 )
941 if not dataset_sdk_version or not ls_utils.is_version_greater_or_equal(
942 dataset_sdk_version, "0.4.33"
943 ):
944 legacy_example_id, example_name = _get_example_id_legacy(
945 func, inputs, test_suite.id
946 )
947 example_id = example_id or legacy_example_id
948 pytest_plugin = (
949 pytest_request.config.pluginmanager.get_plugin("langsmith_output_plugin")
950 if pytest_request
951 else None
952 )
953 pytest_nodeid = pytest_request.node.nodeid if pytest_request else None
954 if pytest_plugin:
955 pytest_plugin.test_suite_urls[test_suite._dataset.name] = (
956 cast(str, test_suite._dataset.url)
957 + "/compare?selectedSessions="
958 + str(test_suite.experiment_id)
959 )
960 test_case = _TestCase(
961 test_suite,
962 run_id=uuid.uuid4(),
963 example_id=example_id,
964 metadata=metadata,
965 split=split,
966 inputs=inputs,
967 reference_outputs=outputs,
968 pytest_plugin=pytest_plugin,
969 pytest_nodeid=pytest_nodeid,
970 )
971 return test_case
974def _run_test(
975 func: Callable,
976 *test_args: Any,
977 pytest_request: Any,
978 langtest_extra: _UTExtra,
979 **test_kwargs: Any,
980) -> None:
981 test_case = _create_test_case(
982 func,
983 *test_args,
984 **test_kwargs,
985 pytest_request=pytest_request,
986 langtest_extra=langtest_extra,
987 )
988 _TEST_CASE.set(test_case)
990 def _test():
991 test_case.start_time()
992 with rh.trace(
993 name=getattr(func, "__name__", "Test"),
994 run_id=test_case.run_id,
995 inputs=test_case.inputs,
996 metadata={
997 # Experiment run metadata is prefixed with "ls_example_" in
998 # the ingest backend, but we must reproduce this behavior here
999 # because the example may not have been created before the trace
1000 # starts.
1001 f"ls_example_{k}": v
1002 for k, v in (test_case.metadata or {}).items()
1003 },
1004 project_name=test_case.test_suite.name,
1005 exceptions_to_handle=(SkipException,),
1006 _end_on_exit=False,
1007 ) as run_tree:
1008 try:
1009 result = func(*test_args, **test_kwargs)
1010 except SkipException as e:
1011 test_case.submit_test_result(error=repr(e), skipped=True)
1012 test_case.end_run(run_tree, {"skipped_reason": repr(e)})
1013 raise e
1014 except BaseException as e:
1015 test_case.submit_test_result(error=repr(e))
1016 test_case.end_run(run_tree, None)
1017 raise e
1018 else:
1019 test_case.end_run(run_tree, result)
1020 finally:
1021 test_case.end_time()
1022 try:
1023 test_case.submit_test_result()
1024 except BaseException as e:
1025 logger.warning(
1026 f"Failed to create feedback for run_id {test_case.run_id}:\n{e}"
1027 )
1029 if langtest_extra["cache"]:
1030 cache_path = Path(langtest_extra["cache"]) / f"{test_case.test_suite.id}.yaml"
1031 else:
1032 cache_path = None
1033 current_context = rh.get_tracing_context()
1034 metadata = {
1035 **(current_context["metadata"] or {}),
1036 **{
1037 "experiment": test_case.test_suite.experiment.name,
1038 },
1039 }
1040 # Handle cached_hosts parameter
1041 ignore_hosts = [test_case.test_suite.client.api_url]
1042 allow_hosts = langtest_extra.get("cached_hosts") or None
1044 with (
1045 rh.tracing_context(**{**current_context, "metadata": metadata}),
1046 ls_utils.with_optional_cache(
1047 cache_path, ignore_hosts=ignore_hosts, allow_hosts=allow_hosts
1048 ),
1049 ):
1050 _test()
1053async def _arun_test(
1054 func: Callable,
1055 *test_args: Any,
1056 pytest_request: Any,
1057 langtest_extra: _UTExtra,
1058 **test_kwargs: Any,
1059) -> None:
1060 test_case = _create_test_case(
1061 func,
1062 *test_args,
1063 **test_kwargs,
1064 pytest_request=pytest_request,
1065 langtest_extra=langtest_extra,
1066 )
1067 _TEST_CASE.set(test_case)
1069 async def _test():
1070 test_case.start_time()
1071 with rh.trace(
1072 name=getattr(func, "__name__", "Test"),
1073 run_id=test_case.run_id,
1074 reference_example_id=test_case.example_id,
1075 inputs=test_case.inputs,
1076 metadata={
1077 # Experiment run metadata is prefixed with "ls_example_" in
1078 # the ingest backend, but we must reproduce this behavior here
1079 # because the example may not have been created before the trace
1080 # starts.
1081 f"ls_example_{k}": v
1082 for k, v in (test_case.metadata or {}).items()
1083 },
1084 project_name=test_case.test_suite.name,
1085 exceptions_to_handle=(SkipException,),
1086 _end_on_exit=False,
1087 ) as run_tree:
1088 try:
1089 result = await func(*test_args, **test_kwargs)
1090 except SkipException as e:
1091 test_case.submit_test_result(error=repr(e), skipped=True)
1092 test_case.end_run(run_tree, {"skipped_reason": repr(e)})
1093 raise e
1094 except BaseException as e:
1095 test_case.submit_test_result(error=repr(e))
1096 test_case.end_run(run_tree, None)
1097 raise e
1098 else:
1099 test_case.end_run(run_tree, result)
1100 finally:
1101 test_case.end_time()
1102 try:
1103 test_case.submit_test_result()
1104 except BaseException as e:
1105 logger.warning(
1106 f"Failed to create feedback for run_id {test_case.run_id}:\n{e}"
1107 )
1109 if langtest_extra["cache"]:
1110 cache_path = Path(langtest_extra["cache"]) / f"{test_case.test_suite.id}.yaml"
1111 else:
1112 cache_path = None
1113 current_context = rh.get_tracing_context()
1114 metadata = {
1115 **(current_context["metadata"] or {}),
1116 **{
1117 "experiment": test_case.test_suite.experiment.name,
1118 "reference_example_id": str(test_case.example_id),
1119 },
1120 }
1121 # Handle cached_hosts parameter
1122 ignore_hosts = [test_case.test_suite.client.api_url]
1123 cached_hosts = langtest_extra.get("cached_hosts")
1124 allow_hosts = cached_hosts if cached_hosts else None
1126 with (
1127 rh.tracing_context(**{**current_context, "metadata": metadata}),
1128 ls_utils.with_optional_cache(
1129 cache_path, ignore_hosts=ignore_hosts, allow_hosts=allow_hosts
1130 ),
1131 ):
1132 await _test()
1135# For backwards compatibility
1136unit = test
1139def log_inputs(inputs: dict, /) -> None:
1140 """Log run inputs from within a pytest test run.
1142 !!! warning
1144 This API is in beta and might change in future versions.
1146 Should only be used in pytest tests decorated with @pytest.mark.langsmith.
1148 Args:
1149 inputs: Inputs to log.
1151 Example:
1152 ```python
1153 from langsmith import testing as t
1156 @pytest.mark.langsmith
1157 def test_foo() -> None:
1158 x = 0
1159 y = 1
1160 t.log_inputs({"x": x, "y": y})
1161 assert foo(x, y) == 2
1162 ```
1163 """
1164 if ls_utils.test_tracking_is_disabled():
1165 logger.info("LANGSMITH_TEST_TRACKING is set to 'false'. Skipping log_inputs.")
1166 return
1167 run_tree = rh.get_current_run_tree()
1168 test_case = _TEST_CASE.get()
1169 if not run_tree or not test_case:
1170 msg = (
1171 "log_inputs should only be called within a pytest test decorated with "
1172 "@pytest.mark.langsmith, and with tracing enabled (by setting the "
1173 "LANGSMITH_TRACING environment variable to 'true')."
1174 )
1175 raise ValueError(msg)
1176 run_tree.add_inputs(inputs)
1177 test_case.log_inputs(inputs)
1180def log_outputs(outputs: dict, /) -> None:
1181 """Log run outputs from within a pytest test run.
1183 !!! warning
1185 This API is in beta and might change in future versions.
1187 Should only be used in pytest tests decorated with @pytest.mark.langsmith.
1189 Args:
1190 outputs: Outputs to log.
1192 Example:
1193 ```python
1194 from langsmith import testing as t
1197 @pytest.mark.langsmith
1198 def test_foo() -> None:
1199 x = 0
1200 y = 1
1201 result = foo(x, y)
1202 t.log_outputs({"foo": result})
1203 assert result == 2
1204 ```
1205 """
1206 if ls_utils.test_tracking_is_disabled():
1207 logger.info("LANGSMITH_TEST_TRACKING is set to 'false'. Skipping log_outputs.")
1208 return
1209 run_tree = rh.get_current_run_tree()
1210 test_case = _TEST_CASE.get()
1211 if not run_tree or not test_case:
1212 msg = (
1213 "log_outputs should only be called within a pytest test decorated with "
1214 "@pytest.mark.langsmith, and with tracing enabled (by setting the "
1215 "LANGSMITH_TRACING environment variable to 'true')."
1216 )
1217 raise ValueError(msg)
1218 outputs = _dumpd(outputs)
1219 run_tree.add_outputs(outputs)
1220 test_case.log_outputs(outputs)
1223def log_reference_outputs(reference_outputs: dict, /) -> None:
1224 """Log example reference outputs from within a pytest test run.
1226 !!! warning
1228 This API is in beta and might change in future versions.
1230 Should only be used in pytest tests decorated with @pytest.mark.langsmith.
1232 Args:
1233 reference_outputs: Reference outputs to log.
1235 Example:
1236 ```python
1237 from langsmith import testing
1240 @pytest.mark.langsmith
1241 def test_foo() -> None:
1242 x = 0
1243 y = 1
1244 expected = 2
1245 testing.log_reference_outputs({"foo": expected})
1246 assert foo(x, y) == expected
1247 ```
1248 """
1249 if ls_utils.test_tracking_is_disabled():
1250 logger.info(
1251 "LANGSMITH_TEST_TRACKING is set to 'false'. Skipping log_reference_outputs."
1252 )
1253 return
1254 test_case = _TEST_CASE.get()
1255 if not test_case:
1256 msg = (
1257 "log_reference_outputs should only be called within a pytest test "
1258 "decorated with @pytest.mark.langsmith."
1259 )
1260 raise ValueError(msg)
1261 test_case.log_reference_outputs(reference_outputs)
1264def log_feedback(
1265 feedback: Optional[Union[dict, list[dict]]] = None,
1266 /,
1267 *,
1268 key: str,
1269 score: Optional[Union[int, bool, float]] = None,
1270 value: Optional[Union[str, int, float, bool]] = None,
1271 **kwargs: Any,
1272) -> None:
1273 """Log run feedback from within a pytest test run.
1275 !!! warning
1277 This API is in beta and might change in future versions.
1279 Should only be used in pytest tests decorated with @pytest.mark.langsmith.
1281 Args:
1282 key: Feedback name.
1283 score: Numerical feedback value.
1284 value: Categorical feedback value
1285 kwargs: Any other Client.create_feedback args.
1287 Example:
1288 ```python
1289 import pytest
1290 from langsmith import testing as t
1293 @pytest.mark.langsmith
1294 def test_foo() -> None:
1295 x = 0
1296 y = 1
1297 expected = 2
1298 result = foo(x, y)
1299 t.log_feedback(key="right_type", score=isinstance(result, int))
1300 assert result == expected
1301 ```
1302 """
1303 if ls_utils.test_tracking_is_disabled():
1304 logger.info("LANGSMITH_TEST_TRACKING is set to 'false'. Skipping log_feedback.")
1305 return
1306 if feedback and any((key, score, value)):
1307 msg = "Must specify one of 'feedback' and ('key', 'score', 'value'), not both."
1308 raise ValueError(msg)
1309 elif not (feedback or key):
1310 msg = "Must specify at least one of 'feedback' or ('key', 'score', value')."
1311 raise ValueError(msg)
1312 elif key:
1313 feedback = {"key": key}
1314 if score is not None:
1315 feedback["score"] = score
1316 if value is not None:
1317 feedback["value"] = value
1318 else:
1319 pass
1321 run_tree = rh.get_current_run_tree()
1322 test_case = _TEST_CASE.get()
1323 if not run_tree or not test_case:
1324 msg = (
1325 "log_feedback should only be called within a pytest test decorated with "
1326 "@pytest.mark.langsmith, and with tracing enabled (by setting the "
1327 "LANGSMITH_TRACING environment variable to 'true')."
1328 )
1329 raise ValueError(msg)
1330 if run_tree.session_name == "evaluators" and run_tree.metadata.get(
1331 "reference_run_id"
1332 ):
1333 run_id = run_tree.metadata["reference_run_id"]
1334 run_tree.add_outputs(
1335 feedback if isinstance(feedback, dict) else {"feedback": feedback}
1336 )
1337 kwargs["source_run_id"] = run_tree.id
1338 else:
1339 run_id = run_tree.trace_id
1340 test_case.submit_feedback(run_id, cast(Union[list, dict], feedback), **kwargs)
1343@contextlib.contextmanager
1344def trace_feedback(
1345 *, name: str = "Feedback"
1346) -> Generator[Optional[run_trees.RunTree], None, None]:
1347 """Trace the computation of a pytest run feedback as its own run.
1349 !!! warning
1351 This API is in beta and might change in future versions.
1353 Args:
1354 name: Feedback run name. Defaults to "Feedback".
1356 Example:
1357 ```python
1358 import openai
1359 import pytest
1361 from langsmith import testing as t
1362 from langsmith import wrappers
1364 oai_client = wrappers.wrap_openai(openai.Client())
1367 @pytest.mark.langsmith
1368 def test_openai_says_hello():
1369 # Traced code will be included in the test case
1370 text = "Say hello!"
1371 response = oai_client.chat.completions.create(
1372 model="gpt-4o-mini",
1373 messages=[
1374 {"role": "system", "content": "You are a helpful assistant."},
1375 {"role": "user", "content": text},
1376 ],
1377 )
1378 t.log_inputs({"text": text})
1379 t.log_outputs({"response": response.choices[0].message.content})
1380 t.log_reference_outputs({"response": "hello!"})
1382 # Use this context manager to trace any steps used for generating evaluation
1383 # feedback separately from the main application logic
1384 with t.trace_feedback():
1385 grade = oai_client.chat.completions.create(
1386 model="gpt-4o-mini",
1387 messages=[
1388 {
1389 "role": "system",
1390 "content": "Return 1 if 'hello' is in the user message and 0 otherwise.",
1391 },
1392 {
1393 "role": "user",
1394 "content": response.choices[0].message.content,
1395 },
1396 ],
1397 )
1398 # Make sure to log relevant feedback within the context for the
1399 # trace to be associated with this feedback.
1400 t.log_feedback(
1401 key="llm_judge", score=float(grade.choices[0].message.content)
1402 )
1404 assert "hello" in response.choices[0].message.content.lower()
1405 ```
1406 """ # noqa: E501
1407 if ls_utils.test_tracking_is_disabled():
1408 logger.info("LANGSMITH_TEST_TRACKING is set to 'false'. Skipping log_feedback.")
1409 yield None
1410 return
1411 test_case = _TEST_CASE.get()
1412 if not test_case:
1413 msg = (
1414 "trace_feedback should only be called within a pytest test decorated with "
1415 "@pytest.mark.langsmith, and with tracing enabled (by setting the "
1416 "LANGSMITH_TRACING environment variable to 'true')."
1417 )
1418 raise ValueError(msg)
1419 metadata = {
1420 "experiment": test_case.test_suite.experiment.name,
1421 "reference_example_id": test_case.example_id,
1422 "reference_run_id": test_case.run_id,
1423 }
1424 with rh.trace(
1425 name=name,
1426 inputs=test_case._logged_outputs,
1427 parent="ignore",
1428 project_name="evaluators",
1429 metadata=metadata,
1430 ) as run_tree:
1431 yield run_tree
1434def _stringify(x: Any) -> str:
1435 try:
1436 return dumps_json(x).decode("utf-8", errors="surrogateescape")
1437 except Exception:
1438 return str(x)
1441def _dumpd(x: Any) -> Any:
1442 """Serialize LangChain Serializable objects."""
1443 dumpd = _get_langchain_dumpd()
1444 if not dumpd:
1445 return x
1446 try:
1447 serialized = dumpd(x)
1448 return serialized
1449 except Exception:
1450 return x
1453@functools.lru_cache
1454def _get_langchain_dumpd() -> Optional[Callable]:
1455 try:
1456 from langchain_core.load import dumpd
1458 return dumpd
1459 except ImportError:
1460 return None