Coverage for langsmith/integrations/openai_agents_sdk/_openai_agents.py: 18%

126 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1import logging 

2from datetime import datetime 

3from typing import Optional 

4 

5from langsmith import run_trees as rt 

6from langsmith._internal import _context 

7from langsmith.run_helpers import get_current_run_tree 

8 

9try: 

10 from agents import tracing # type: ignore[import] 

11 

12 required = ( 

13 "TracingProcessor", 

14 "Trace", 

15 "Span", 

16 "ResponseSpanData", 

17 ) 

18 if not all(hasattr(tracing, name) for name in required): 

19 raise ImportError("The `agents` package is not installed.") 

20 

21 from langsmith.integrations.openai_agents_sdk import ( 

22 _openai_agent_utils as agent_utils, 

23 ) 

24 

25 HAVE_AGENTS = True 

26except ImportError: 

27 HAVE_AGENTS = False 

28 

29 class OpenAIAgentsTracingProcessor: 

30 """Tracing processor for the [OpenAI Agents SDK](https://openai.github.io/openai-agents-python/). 

31 

32 Traces all intermediate steps of your OpenAI Agent to LangSmith. 

33 

34 Requirements: Make sure to install `pip install -U langsmith[openai-agents]`. 

35 

36 Args: 

37 client: An instance of `langsmith.client.Client`. If not provided, a default 

38 client is created. 

39 

40 Example: 

41 ```python 

42 from agents import ( 

43 Agent, 

44 FileSearchTool, 

45 Runner, 

46 WebSearchTool, 

47 function_tool, 

48 set_trace_processors, 

49 ) 

50 

51 from langsmith.wrappers import OpenAIAgentsTracingProcessor 

52 

53 set_trace_processors([OpenAIAgentsTracingProcessor()]) 

54 

55 

56 @function_tool 

57 def get_weather(city: str) -> str: 

58 return f"The weather in {city} is sunny" 

59 

60 

61 haiku_agent = Agent( 

62 name="Haiku agent", 

63 instructions="Always respond in haiku form", 

64 model="o3-mini", 

65 tools=[get_weather], 

66 ) 

67 agent = Agent( 

68 name="Assistant", 

69 tools=[WebSearchTool()], 

70 instructions="speak in spanish. use Haiku agent if they ask for a haiku or for the weather", 

71 handoffs=[haiku_agent], 

72 ) 

73 

74 result = await Runner.run( 

75 agent, 

76 "write a haiku about the weather today and tell me a recent news story about new york", 

77 ) 

78 print(result.final_output) 

79 ``` 

80 """ # noqa: E501 

81 

82 def __init__(self, *args, **kwargs): 

83 raise ImportError( 

84 "The `agents` package is not installed. " 

85 "Please install it with `pip install langsmith[openai-agents]`." 

86 ) 

87 

88 

89from langsmith import client as ls_client 

90 

91logger = logging.getLogger(__name__) 

92 

93if HAVE_AGENTS: 

94 

95 class OpenAIAgentsTracingProcessor(tracing.TracingProcessor): # type: ignore[no-redef] 

96 """Tracing processor for the [OpenAI Agents SDK](https://openai.github.io/openai-agents-python/). 

97 

98 Traces all intermediate steps of your OpenAI Agent to LangSmith. 

99 

100 Requirements: Make sure to install `pip install -U langsmith[openai-agents]`. 

101 

102 Args: 

103 client: An instance of `langsmith.client.Client`. If not provided, 

104 a default client is created. 

105 metadata: Metadata to associate with all traces. 

106 tags: Tags to associate with all traces. 

107 project_name: LangSmith project to trace to. 

108 name: Name of the root trace. 

109 

110 Example: 

111 ```python 

112 from agents import ( 

113 Agent, 

114 FileSearchTool, 

115 Runner, 

116 WebSearchTool, 

117 function_tool, 

118 set_trace_processors, 

119 ) 

120 

121 from langsmith.wrappers import OpenAIAgentsTracingProcessor 

122 

123 set_trace_processors([OpenAIAgentsTracingProcessor()]) 

124 

125 

126 @function_tool 

127 def get_weather(city: str) -> str: 

128 return f"The weather in {city} is sunny" 

129 

130 

131 haiku_agent = Agent( 

132 name="Haiku agent", 

133 instructions="Always respond in haiku form", 

134 model="o3-mini", 

135 tools=[get_weather], 

136 ) 

137 agent = Agent( 

138 name="Assistant", 

139 tools=[WebSearchTool()], 

140 instructions="speak in spanish. use Haiku agent if they ask for a haiku or for the weather", 

141 handoffs=[haiku_agent], 

142 ) 

143 

144 result = await Runner.run( 

145 agent, 

146 "write a haiku about the weather today and tell me a recent news story about new york", 

147 ) 

148 print(result.final_output) 

149 ``` 

150 """ # noqa: E501 

151 

152 def __init__( 

153 self, 

154 client: Optional[ls_client.Client] = None, 

155 *, 

156 metadata: Optional[dict] = None, 

157 tags: Optional[list[str]] = None, 

158 project_name: Optional[str] = None, 

159 name: Optional[str] = None, 

160 ): 

161 self.client = client or rt.get_cached_client() 

162 self._metadata = metadata 

163 self._tags = tags 

164 self._project_name = project_name 

165 self._name = name 

166 self._first_response_inputs: dict = {} 

167 self._last_response_outputs: dict = {} 

168 

169 self._runs: dict[str, rt.RunTree] = {} 

170 

171 def on_trace_start(self, trace: tracing.Trace) -> None: 

172 current_run_tree = get_current_run_tree() 

173 

174 # Determine run name 

175 if self._name: 

176 run_name = self._name 

177 elif trace.name: 

178 run_name = trace.name 

179 else: 

180 run_name = "Agent workflow" 

181 

182 # Build metadata 

183 run_extra = {"metadata": self._metadata or {}} 

184 trace_dict = trace.export() or {} 

185 if trace_dict.get("group_id") is not None: 

186 run_extra["metadata"]["thread_id"] = trace_dict["group_id"] 

187 

188 try: 

189 if current_run_tree is not None: 

190 # Nest under existing trace 

191 new_run = current_run_tree.create_child( 

192 name=run_name, 

193 run_type="chain", 

194 inputs={}, 

195 extra=run_extra, 

196 tags=self._tags, 

197 ) 

198 else: 

199 # Create new root trace 

200 run_kwargs = { 

201 "name": run_name, 

202 "run_type": "chain", 

203 "inputs": {}, 

204 "extra": run_extra, 

205 "tags": self._tags, 

206 "client": self.client, 

207 } 

208 if self._project_name is not None: 

209 run_kwargs["project_name"] = self._project_name 

210 new_run = rt.RunTree(**run_kwargs) # type: ignore[arg-type] 

211 

212 new_run.post() 

213 _context._PARENT_RUN_TREE.set(new_run) 

214 self._runs[trace.trace_id] = new_run 

215 except Exception as e: 

216 logger.exception(f"Error creating trace run: {e}") 

217 

218 def on_trace_end(self, trace: tracing.Trace) -> None: 

219 run = self._runs.pop(trace.trace_id, None) 

220 if not run: 

221 return 

222 

223 trace_dict = trace.export() or {} 

224 metadata = {**(trace_dict.get("metadata") or {}), **(self._metadata or {})} 

225 

226 try: 

227 # Update run with final inputs/outputs 

228 run.inputs = self._first_response_inputs.pop(trace.trace_id, {}) 

229 run.outputs = self._last_response_outputs.pop(trace.trace_id, {}) 

230 

231 # Update metadata 

232 if "metadata" not in run.extra: 

233 run.extra["metadata"] = {} 

234 run.extra["metadata"].update(metadata) 

235 

236 # End and patch 

237 run.end() 

238 run.patch() 

239 except Exception as e: 

240 logger.exception(f"Error updating trace run: {e}") 

241 

242 def on_span_start(self, span: tracing.Span) -> None: 

243 # Find parent run 

244 parent_run = ( 

245 self._runs.get(span.parent_id) 

246 if span.parent_id 

247 else self._runs.get(span.trace_id) 

248 ) 

249 

250 if parent_run is None: 

251 logger.warning( 

252 f"No trace info found for span, skipping: {span.span_id}" 

253 ) 

254 return 

255 

256 # Extract span data 

257 run_name = agent_utils.get_run_name(span) 

258 if isinstance(span.span_data, tracing.ResponseSpanData): 

259 parent_name = parent_run.name 

260 raw_span_name = getattr(span, "name", None) or getattr( 

261 span.span_data, "name", None 

262 ) 

263 span_name = str(raw_span_name) if raw_span_name else run_name 

264 if parent_name: 

265 run_name = f"{parent_name} {span_name}".strip() 

266 else: 

267 run_name = span_name 

268 

269 run_type = agent_utils.get_run_type(span) 

270 extracted = agent_utils.extract_span_data(span) 

271 

272 try: 

273 # Create child run 

274 child_run = parent_run.create_child( 

275 name=run_name, 

276 run_type=run_type, 

277 inputs=extracted.get("inputs", {}), 

278 extra=extracted, 

279 start_time=datetime.fromisoformat(span.started_at) 

280 if span.started_at 

281 else None, 

282 ) 

283 

284 child_run.post() 

285 _context._PARENT_RUN_TREE.set(child_run) 

286 self._runs[span.span_id] = child_run 

287 except Exception as e: 

288 logger.exception(f"Error creating span run: {e}") 

289 

290 def on_span_end(self, span: tracing.Span) -> None: 

291 run = self._runs.pop(span.span_id, None) 

292 if not run: 

293 return 

294 

295 try: 

296 # Extract outputs and metadata 

297 extracted = agent_utils.extract_span_data(span) 

298 outputs = extracted.pop("outputs", {}) 

299 inputs = extracted.pop("inputs", {}) 

300 

301 # Update run 

302 run.outputs = outputs 

303 if inputs: 

304 run.inputs = inputs 

305 if error := span.error: 

306 run.error = str(error) 

307 

308 # Add OpenAI metadata 

309 if "metadata" not in run.extra: 

310 run.extra["metadata"] = {} 

311 run.extra["metadata"].update( 

312 { 

313 "openai_parent_id": span.parent_id, 

314 "openai_trace_id": span.trace_id, 

315 "openai_span_id": span.span_id, 

316 } 

317 ) 

318 # Merge any additional metadata from extracted 

319 if metadata := extracted.get("metadata"): 

320 run.extra["metadata"].update(metadata) 

321 

322 # Track first/last response inputs/outputs for trace 

323 if isinstance(span.span_data, tracing.ResponseSpanData): 

324 self._first_response_inputs[span.trace_id] = ( 

325 self._first_response_inputs.get(span.trace_id) or inputs 

326 ) 

327 self._last_response_outputs[span.trace_id] = outputs 

328 elif isinstance(span.span_data, tracing.GenerationSpanData): 

329 # Use generation spans as fallback if no response spans exist 

330 self._first_response_inputs[span.trace_id] = ( 

331 self._first_response_inputs.get(span.trace_id) or inputs 

332 ) 

333 self._last_response_outputs[span.trace_id] = outputs 

334 

335 # End and patch 

336 if span.ended_at: 

337 run.end_time = datetime.fromisoformat(span.ended_at) 

338 else: 

339 run.end() 

340 

341 run.patch() 

342 except Exception as e: 

343 logger.exception(f"Error updating span run: {e}") 

344 

345 def shutdown(self) -> None: 

346 self.client.flush() 

347 

348 def force_flush(self) -> None: 

349 self.client.flush()