Coverage for langsmith/beta/_evals.py: 0%
95 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""Beta utility functions to assist in common eval workflows.
3These functions may change in the future.
4"""
6import collections
7import datetime
8import itertools
9import uuid
10from collections.abc import Sequence
11from typing import Optional, TypeVar
13import langsmith.run_trees as rt
14import langsmith.schemas as ls_schemas
15from langsmith import evaluation as ls_eval
16from langsmith._internal._beta_decorator import warn_beta
17from langsmith.client import Client
20def _convert_ids(run_dict: dict, id_map: dict) -> dict:
21 """Convert the IDs in the run dictionary using the provided ID map.
23 Parameters:
24 - run_dict: The dictionary representing a run.
25 - id_map: The dictionary mapping old IDs to new IDs.
27 Returns:
28 - dict: The updated run dictionary.
29 """
30 do = run_dict["dotted_order"]
31 for k, v in id_map.items():
32 do = do.replace(str(k), str(v))
33 run_dict["dotted_order"] = do
35 if run_dict.get("parent_run_id"):
36 run_dict["parent_run_id"] = id_map[run_dict["parent_run_id"]]
37 if not run_dict.get("extra"):
38 run_dict["extra"] = {}
39 return run_dict
42def _convert_root_run(root: ls_schemas.Run, run_to_example_map: dict) -> list[dict]:
43 """Convert the root run and its child runs to a list of dictionaries.
45 Parameters:
46 - root: The root run to convert.
47 - run_to_example_map: The dictionary mapping run IDs to example IDs.
49 Returns:
50 - The list of converted run dictionaries.
51 """
52 runs_ = [root]
53 trace_id = uuid.uuid4()
54 id_map = {root.trace_id: trace_id}
55 results = []
56 while runs_:
57 src = runs_.pop()
58 src_dict = src.dict(exclude={"parent_run_ids", "child_run_ids", "session_id"})
59 id_map[src_dict["id"]] = id_map.get(src_dict["id"], uuid.uuid4())
60 src_dict["id"] = id_map[src_dict["id"]]
61 src_dict["trace_id"] = id_map[src_dict["trace_id"]]
62 if src.child_runs:
63 runs_.extend(src.child_runs)
64 results.append(src_dict)
65 result = [_convert_ids(r, id_map) for r in results]
66 result[0]["reference_example_id"] = run_to_example_map[root.id]
67 return result
70@warn_beta
71def convert_runs_to_test(
72 runs: Sequence[ls_schemas.Run],
73 *,
74 dataset_name: str,
75 test_project_name: Optional[str] = None,
76 client: Optional[Client] = None,
77 load_child_runs: bool = False,
78 include_outputs: bool = False,
79) -> ls_schemas.TracerSession:
80 """Convert the following runs to a dataset + test.
82 This makes it easy to sample prod runs into a new regression testing
83 workflow and compare against a candidate system.
85 Internally, this function does the following:
86 1. Create a dataset from the provided production run inputs.
87 2. Create a new test project.
88 3. Clone the production runs and re-upload against the dataset.
90 Parameters:
91 - runs: A sequence of runs to be executed as a test.
92 - dataset_name: The name of the dataset to associate with the test runs.
93 - client: An optional LangSmith client instance. If not provided, a new client will
94 be created.
95 - load_child_runs: Whether to load child runs when copying runs.
97 Returns:
98 - The project containing the cloned runs.
100 Example:
101 --------
102 ```python
103 import langsmith
104 import random
106 client = langsmith.Client()
108 # Randomly sample 100 runs from a prod project
109 runs = list(client.list_runs(project_name="My Project", execution_order=1))
110 sampled_runs = random.sample(runs, min(len(runs), 100))
112 runs_as_test(runs, dataset_name="Random Runs")
114 # Select runs named "extractor" whose root traces received good feedback
115 runs = client.list_runs(
116 project_name="<your_project>",
117 filter='eq(name, "extractor")',
118 trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
119 )
120 runs_as_test(runs, dataset_name="Extraction Good")
121 ```
122 """
123 if not runs:
124 raise ValueError(f"""Expected a non-empty sequence of runs. Received: {runs}""")
125 client = client or rt.get_cached_client()
126 ds = client.create_dataset(dataset_name=dataset_name)
127 outputs = [r.outputs for r in runs] if include_outputs else None
128 client.create_examples(
129 inputs=[r.inputs for r in runs],
130 outputs=outputs,
131 source_run_ids=[r.id for r in runs],
132 dataset_id=ds.id,
133 )
135 if not load_child_runs:
136 runs_to_copy = runs
137 else:
138 runs_to_copy = [
139 client.read_run(r.id, load_child_runs=load_child_runs) for r in runs
140 ]
142 test_project_name = test_project_name or f"prod-baseline-{uuid.uuid4().hex[:6]}"
144 examples = list(client.list_examples(dataset_name=dataset_name))
145 run_to_example_map = {e.source_run_id: e.id for e in examples}
146 dataset_version = (
147 examples[0].modified_at if examples[0].modified_at else examples[0].created_at
148 )
150 to_create = [
151 run_dict
152 for root_run in runs_to_copy
153 for run_dict in _convert_root_run(root_run, run_to_example_map)
154 ]
156 project = client.create_project(
157 project_name=test_project_name,
158 reference_dataset_id=ds.id,
159 metadata={
160 "which": "prod-baseline",
161 "dataset_version": dataset_version.isoformat(),
162 },
163 )
165 for new_run in to_create:
166 latency = new_run["end_time"] - new_run["start_time"]
167 new_run["start_time"] = datetime.datetime.now(tz=datetime.timezone.utc)
168 new_run["end_time"] = new_run["start_time"] + latency
169 client.create_run(**new_run, project_name=test_project_name)
171 _ = client.update_project(
172 project.id,
173 )
174 return project
177def _load_nested_traces(project_name: str, client: Client) -> list[ls_schemas.Run]:
178 runs = client.list_runs(project_name=project_name)
179 treemap: collections.defaultdict[uuid.UUID, list[ls_schemas.Run]] = (
180 collections.defaultdict(list)
181 )
182 results = []
183 all_runs = {}
184 for run in runs:
185 if run.parent_run_id is not None:
186 treemap[run.parent_run_id].append(run)
187 else:
188 results.append(run)
189 all_runs[run.id] = run
190 for run_id, child_runs in treemap.items():
191 all_runs[run_id].child_runs = sorted(child_runs, key=lambda r: r.dotted_order)
192 return results
195T = TypeVar("T")
196U = TypeVar("U")
199def _outer_product(list1: list[T], list2: list[U]) -> list[tuple[T, U]]:
200 return list(itertools.product(list1, list2))
203@warn_beta
204def compute_test_metrics(
205 project_name: str,
206 *,
207 evaluators: list,
208 max_concurrency: Optional[int] = 10,
209 client: Optional[Client] = None,
210) -> None:
211 """Compute test metrics for a given test name using a list of evaluators.
213 Args:
214 project_name (str): The name of the test project to evaluate.
215 evaluators (list): A list of evaluators to compute metrics with.
216 max_concurrency (Optional[int], optional): The maximum number of concurrent
217 evaluations. Defaults to 10.
218 client (Optional[Client], optional): The client to use for evaluations.
219 Defaults to None.
221 Returns:
222 None: This function does not return any value.
223 """
224 from langsmith import ContextThreadPoolExecutor
226 evaluators_: list[ls_eval.RunEvaluator] = []
227 for func in evaluators:
228 if isinstance(func, ls_eval.RunEvaluator):
229 evaluators_.append(func)
230 elif callable(func):
231 evaluators_.append(ls_eval.run_evaluator(func))
232 else:
233 raise NotImplementedError(
234 f"Evaluation not yet implemented for evaluator of type {type(func)}"
235 )
236 client = client or rt.get_cached_client()
237 traces = _load_nested_traces(project_name, client)
238 with ContextThreadPoolExecutor(max_workers=max_concurrency) as executor:
239 results = executor.map(
240 client.evaluate_run, *zip(*_outer_product(traces, evaluators_))
241 )
242 for _ in results:
243 pass