Coverage for langsmith/beta/_evals.py: 0%

95 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""Beta utility functions to assist in common eval workflows. 

2 

3These functions may change in the future. 

4""" 

5 

6import collections 

7import datetime 

8import itertools 

9import uuid 

10from collections.abc import Sequence 

11from typing import Optional, TypeVar 

12 

13import langsmith.run_trees as rt 

14import langsmith.schemas as ls_schemas 

15from langsmith import evaluation as ls_eval 

16from langsmith._internal._beta_decorator import warn_beta 

17from langsmith.client import Client 

18 

19 

20def _convert_ids(run_dict: dict, id_map: dict) -> dict: 

21 """Convert the IDs in the run dictionary using the provided ID map. 

22 

23 Parameters: 

24 - run_dict: The dictionary representing a run. 

25 - id_map: The dictionary mapping old IDs to new IDs. 

26 

27 Returns: 

28 - dict: The updated run dictionary. 

29 """ 

30 do = run_dict["dotted_order"] 

31 for k, v in id_map.items(): 

32 do = do.replace(str(k), str(v)) 

33 run_dict["dotted_order"] = do 

34 

35 if run_dict.get("parent_run_id"): 

36 run_dict["parent_run_id"] = id_map[run_dict["parent_run_id"]] 

37 if not run_dict.get("extra"): 

38 run_dict["extra"] = {} 

39 return run_dict 

40 

41 

42def _convert_root_run(root: ls_schemas.Run, run_to_example_map: dict) -> list[dict]: 

43 """Convert the root run and its child runs to a list of dictionaries. 

44 

45 Parameters: 

46 - root: The root run to convert. 

47 - run_to_example_map: The dictionary mapping run IDs to example IDs. 

48 

49 Returns: 

50 - The list of converted run dictionaries. 

51 """ 

52 runs_ = [root] 

53 trace_id = uuid.uuid4() 

54 id_map = {root.trace_id: trace_id} 

55 results = [] 

56 while runs_: 

57 src = runs_.pop() 

58 src_dict = src.dict(exclude={"parent_run_ids", "child_run_ids", "session_id"}) 

59 id_map[src_dict["id"]] = id_map.get(src_dict["id"], uuid.uuid4()) 

60 src_dict["id"] = id_map[src_dict["id"]] 

61 src_dict["trace_id"] = id_map[src_dict["trace_id"]] 

62 if src.child_runs: 

63 runs_.extend(src.child_runs) 

64 results.append(src_dict) 

65 result = [_convert_ids(r, id_map) for r in results] 

66 result[0]["reference_example_id"] = run_to_example_map[root.id] 

67 return result 

68 

69 

70@warn_beta 

71def convert_runs_to_test( 

72 runs: Sequence[ls_schemas.Run], 

73 *, 

74 dataset_name: str, 

75 test_project_name: Optional[str] = None, 

76 client: Optional[Client] = None, 

77 load_child_runs: bool = False, 

78 include_outputs: bool = False, 

79) -> ls_schemas.TracerSession: 

80 """Convert the following runs to a dataset + test. 

81 

82 This makes it easy to sample prod runs into a new regression testing 

83 workflow and compare against a candidate system. 

84 

85 Internally, this function does the following: 

86 1. Create a dataset from the provided production run inputs. 

87 2. Create a new test project. 

88 3. Clone the production runs and re-upload against the dataset. 

89 

90 Parameters: 

91 - runs: A sequence of runs to be executed as a test. 

92 - dataset_name: The name of the dataset to associate with the test runs. 

93 - client: An optional LangSmith client instance. If not provided, a new client will 

94 be created. 

95 - load_child_runs: Whether to load child runs when copying runs. 

96 

97 Returns: 

98 - The project containing the cloned runs. 

99 

100 Example: 

101 -------- 

102 ```python 

103 import langsmith 

104 import random 

105 

106 client = langsmith.Client() 

107 

108 # Randomly sample 100 runs from a prod project 

109 runs = list(client.list_runs(project_name="My Project", execution_order=1)) 

110 sampled_runs = random.sample(runs, min(len(runs), 100)) 

111 

112 runs_as_test(runs, dataset_name="Random Runs") 

113 

114 # Select runs named "extractor" whose root traces received good feedback 

115 runs = client.list_runs( 

116 project_name="<your_project>", 

117 filter='eq(name, "extractor")', 

118 trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))', 

119 ) 

120 runs_as_test(runs, dataset_name="Extraction Good") 

121 ``` 

122 """ 

123 if not runs: 

124 raise ValueError(f"""Expected a non-empty sequence of runs. Received: {runs}""") 

125 client = client or rt.get_cached_client() 

126 ds = client.create_dataset(dataset_name=dataset_name) 

127 outputs = [r.outputs for r in runs] if include_outputs else None 

128 client.create_examples( 

129 inputs=[r.inputs for r in runs], 

130 outputs=outputs, 

131 source_run_ids=[r.id for r in runs], 

132 dataset_id=ds.id, 

133 ) 

134 

135 if not load_child_runs: 

136 runs_to_copy = runs 

137 else: 

138 runs_to_copy = [ 

139 client.read_run(r.id, load_child_runs=load_child_runs) for r in runs 

140 ] 

141 

142 test_project_name = test_project_name or f"prod-baseline-{uuid.uuid4().hex[:6]}" 

143 

144 examples = list(client.list_examples(dataset_name=dataset_name)) 

145 run_to_example_map = {e.source_run_id: e.id for e in examples} 

146 dataset_version = ( 

147 examples[0].modified_at if examples[0].modified_at else examples[0].created_at 

148 ) 

149 

150 to_create = [ 

151 run_dict 

152 for root_run in runs_to_copy 

153 for run_dict in _convert_root_run(root_run, run_to_example_map) 

154 ] 

155 

156 project = client.create_project( 

157 project_name=test_project_name, 

158 reference_dataset_id=ds.id, 

159 metadata={ 

160 "which": "prod-baseline", 

161 "dataset_version": dataset_version.isoformat(), 

162 }, 

163 ) 

164 

165 for new_run in to_create: 

166 latency = new_run["end_time"] - new_run["start_time"] 

167 new_run["start_time"] = datetime.datetime.now(tz=datetime.timezone.utc) 

168 new_run["end_time"] = new_run["start_time"] + latency 

169 client.create_run(**new_run, project_name=test_project_name) 

170 

171 _ = client.update_project( 

172 project.id, 

173 ) 

174 return project 

175 

176 

177def _load_nested_traces(project_name: str, client: Client) -> list[ls_schemas.Run]: 

178 runs = client.list_runs(project_name=project_name) 

179 treemap: collections.defaultdict[uuid.UUID, list[ls_schemas.Run]] = ( 

180 collections.defaultdict(list) 

181 ) 

182 results = [] 

183 all_runs = {} 

184 for run in runs: 

185 if run.parent_run_id is not None: 

186 treemap[run.parent_run_id].append(run) 

187 else: 

188 results.append(run) 

189 all_runs[run.id] = run 

190 for run_id, child_runs in treemap.items(): 

191 all_runs[run_id].child_runs = sorted(child_runs, key=lambda r: r.dotted_order) 

192 return results 

193 

194 

195T = TypeVar("T") 

196U = TypeVar("U") 

197 

198 

199def _outer_product(list1: list[T], list2: list[U]) -> list[tuple[T, U]]: 

200 return list(itertools.product(list1, list2)) 

201 

202 

203@warn_beta 

204def compute_test_metrics( 

205 project_name: str, 

206 *, 

207 evaluators: list, 

208 max_concurrency: Optional[int] = 10, 

209 client: Optional[Client] = None, 

210) -> None: 

211 """Compute test metrics for a given test name using a list of evaluators. 

212 

213 Args: 

214 project_name (str): The name of the test project to evaluate. 

215 evaluators (list): A list of evaluators to compute metrics with. 

216 max_concurrency (Optional[int], optional): The maximum number of concurrent 

217 evaluations. Defaults to 10. 

218 client (Optional[Client], optional): The client to use for evaluations. 

219 Defaults to None. 

220 

221 Returns: 

222 None: This function does not return any value. 

223 """ 

224 from langsmith import ContextThreadPoolExecutor 

225 

226 evaluators_: list[ls_eval.RunEvaluator] = [] 

227 for func in evaluators: 

228 if isinstance(func, ls_eval.RunEvaluator): 

229 evaluators_.append(func) 

230 elif callable(func): 

231 evaluators_.append(ls_eval.run_evaluator(func)) 

232 else: 

233 raise NotImplementedError( 

234 f"Evaluation not yet implemented for evaluator of type {type(func)}" 

235 ) 

236 client = client or rt.get_cached_client() 

237 traces = _load_nested_traces(project_name, client) 

238 with ContextThreadPoolExecutor(max_workers=max_concurrency) as executor: 

239 results = executor.map( 

240 client.evaluate_run, *zip(*_outer_product(traces, evaluators_)) 

241 ) 

242 for _ in results: 

243 pass