Coverage for langsmith/integrations/openai_agents_sdk/_openai_agents.py: 18%
126 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1import logging
2from datetime import datetime
3from typing import Optional
5from langsmith import run_trees as rt
6from langsmith._internal import _context
7from langsmith.run_helpers import get_current_run_tree
9try:
10 from agents import tracing # type: ignore[import]
12 required = (
13 "TracingProcessor",
14 "Trace",
15 "Span",
16 "ResponseSpanData",
17 )
18 if not all(hasattr(tracing, name) for name in required):
19 raise ImportError("The `agents` package is not installed.")
21 from langsmith.integrations.openai_agents_sdk import (
22 _openai_agent_utils as agent_utils,
23 )
25 HAVE_AGENTS = True
26except ImportError:
27 HAVE_AGENTS = False
29 class OpenAIAgentsTracingProcessor:
30 """Tracing processor for the [OpenAI Agents SDK](https://openai.github.io/openai-agents-python/).
32 Traces all intermediate steps of your OpenAI Agent to LangSmith.
34 Requirements: Make sure to install `pip install -U langsmith[openai-agents]`.
36 Args:
37 client: An instance of `langsmith.client.Client`. If not provided, a default
38 client is created.
40 Example:
41 ```python
42 from agents import (
43 Agent,
44 FileSearchTool,
45 Runner,
46 WebSearchTool,
47 function_tool,
48 set_trace_processors,
49 )
51 from langsmith.wrappers import OpenAIAgentsTracingProcessor
53 set_trace_processors([OpenAIAgentsTracingProcessor()])
56 @function_tool
57 def get_weather(city: str) -> str:
58 return f"The weather in {city} is sunny"
61 haiku_agent = Agent(
62 name="Haiku agent",
63 instructions="Always respond in haiku form",
64 model="o3-mini",
65 tools=[get_weather],
66 )
67 agent = Agent(
68 name="Assistant",
69 tools=[WebSearchTool()],
70 instructions="speak in spanish. use Haiku agent if they ask for a haiku or for the weather",
71 handoffs=[haiku_agent],
72 )
74 result = await Runner.run(
75 agent,
76 "write a haiku about the weather today and tell me a recent news story about new york",
77 )
78 print(result.final_output)
79 ```
80 """ # noqa: E501
82 def __init__(self, *args, **kwargs):
83 raise ImportError(
84 "The `agents` package is not installed. "
85 "Please install it with `pip install langsmith[openai-agents]`."
86 )
89from langsmith import client as ls_client
91logger = logging.getLogger(__name__)
93if HAVE_AGENTS:
95 class OpenAIAgentsTracingProcessor(tracing.TracingProcessor): # type: ignore[no-redef]
96 """Tracing processor for the [OpenAI Agents SDK](https://openai.github.io/openai-agents-python/).
98 Traces all intermediate steps of your OpenAI Agent to LangSmith.
100 Requirements: Make sure to install `pip install -U langsmith[openai-agents]`.
102 Args:
103 client: An instance of `langsmith.client.Client`. If not provided,
104 a default client is created.
105 metadata: Metadata to associate with all traces.
106 tags: Tags to associate with all traces.
107 project_name: LangSmith project to trace to.
108 name: Name of the root trace.
110 Example:
111 ```python
112 from agents import (
113 Agent,
114 FileSearchTool,
115 Runner,
116 WebSearchTool,
117 function_tool,
118 set_trace_processors,
119 )
121 from langsmith.wrappers import OpenAIAgentsTracingProcessor
123 set_trace_processors([OpenAIAgentsTracingProcessor()])
126 @function_tool
127 def get_weather(city: str) -> str:
128 return f"The weather in {city} is sunny"
131 haiku_agent = Agent(
132 name="Haiku agent",
133 instructions="Always respond in haiku form",
134 model="o3-mini",
135 tools=[get_weather],
136 )
137 agent = Agent(
138 name="Assistant",
139 tools=[WebSearchTool()],
140 instructions="speak in spanish. use Haiku agent if they ask for a haiku or for the weather",
141 handoffs=[haiku_agent],
142 )
144 result = await Runner.run(
145 agent,
146 "write a haiku about the weather today and tell me a recent news story about new york",
147 )
148 print(result.final_output)
149 ```
150 """ # noqa: E501
152 def __init__(
153 self,
154 client: Optional[ls_client.Client] = None,
155 *,
156 metadata: Optional[dict] = None,
157 tags: Optional[list[str]] = None,
158 project_name: Optional[str] = None,
159 name: Optional[str] = None,
160 ):
161 self.client = client or rt.get_cached_client()
162 self._metadata = metadata
163 self._tags = tags
164 self._project_name = project_name
165 self._name = name
166 self._first_response_inputs: dict = {}
167 self._last_response_outputs: dict = {}
169 self._runs: dict[str, rt.RunTree] = {}
171 def on_trace_start(self, trace: tracing.Trace) -> None:
172 current_run_tree = get_current_run_tree()
174 # Determine run name
175 if self._name:
176 run_name = self._name
177 elif trace.name:
178 run_name = trace.name
179 else:
180 run_name = "Agent workflow"
182 # Build metadata
183 run_extra = {"metadata": self._metadata or {}}
184 trace_dict = trace.export() or {}
185 if trace_dict.get("group_id") is not None:
186 run_extra["metadata"]["thread_id"] = trace_dict["group_id"]
188 try:
189 if current_run_tree is not None:
190 # Nest under existing trace
191 new_run = current_run_tree.create_child(
192 name=run_name,
193 run_type="chain",
194 inputs={},
195 extra=run_extra,
196 tags=self._tags,
197 )
198 else:
199 # Create new root trace
200 run_kwargs = {
201 "name": run_name,
202 "run_type": "chain",
203 "inputs": {},
204 "extra": run_extra,
205 "tags": self._tags,
206 "client": self.client,
207 }
208 if self._project_name is not None:
209 run_kwargs["project_name"] = self._project_name
210 new_run = rt.RunTree(**run_kwargs) # type: ignore[arg-type]
212 new_run.post()
213 _context._PARENT_RUN_TREE.set(new_run)
214 self._runs[trace.trace_id] = new_run
215 except Exception as e:
216 logger.exception(f"Error creating trace run: {e}")
218 def on_trace_end(self, trace: tracing.Trace) -> None:
219 run = self._runs.pop(trace.trace_id, None)
220 if not run:
221 return
223 trace_dict = trace.export() or {}
224 metadata = {**(trace_dict.get("metadata") or {}), **(self._metadata or {})}
226 try:
227 # Update run with final inputs/outputs
228 run.inputs = self._first_response_inputs.pop(trace.trace_id, {})
229 run.outputs = self._last_response_outputs.pop(trace.trace_id, {})
231 # Update metadata
232 if "metadata" not in run.extra:
233 run.extra["metadata"] = {}
234 run.extra["metadata"].update(metadata)
236 # End and patch
237 run.end()
238 run.patch()
239 except Exception as e:
240 logger.exception(f"Error updating trace run: {e}")
242 def on_span_start(self, span: tracing.Span) -> None:
243 # Find parent run
244 parent_run = (
245 self._runs.get(span.parent_id)
246 if span.parent_id
247 else self._runs.get(span.trace_id)
248 )
250 if parent_run is None:
251 logger.warning(
252 f"No trace info found for span, skipping: {span.span_id}"
253 )
254 return
256 # Extract span data
257 run_name = agent_utils.get_run_name(span)
258 if isinstance(span.span_data, tracing.ResponseSpanData):
259 parent_name = parent_run.name
260 raw_span_name = getattr(span, "name", None) or getattr(
261 span.span_data, "name", None
262 )
263 span_name = str(raw_span_name) if raw_span_name else run_name
264 if parent_name:
265 run_name = f"{parent_name} {span_name}".strip()
266 else:
267 run_name = span_name
269 run_type = agent_utils.get_run_type(span)
270 extracted = agent_utils.extract_span_data(span)
272 try:
273 # Create child run
274 child_run = parent_run.create_child(
275 name=run_name,
276 run_type=run_type,
277 inputs=extracted.get("inputs", {}),
278 extra=extracted,
279 start_time=datetime.fromisoformat(span.started_at)
280 if span.started_at
281 else None,
282 )
284 child_run.post()
285 _context._PARENT_RUN_TREE.set(child_run)
286 self._runs[span.span_id] = child_run
287 except Exception as e:
288 logger.exception(f"Error creating span run: {e}")
290 def on_span_end(self, span: tracing.Span) -> None:
291 run = self._runs.pop(span.span_id, None)
292 if not run:
293 return
295 try:
296 # Extract outputs and metadata
297 extracted = agent_utils.extract_span_data(span)
298 outputs = extracted.pop("outputs", {})
299 inputs = extracted.pop("inputs", {})
301 # Update run
302 run.outputs = outputs
303 if inputs:
304 run.inputs = inputs
305 if error := span.error:
306 run.error = str(error)
308 # Add OpenAI metadata
309 if "metadata" not in run.extra:
310 run.extra["metadata"] = {}
311 run.extra["metadata"].update(
312 {
313 "openai_parent_id": span.parent_id,
314 "openai_trace_id": span.trace_id,
315 "openai_span_id": span.span_id,
316 }
317 )
318 # Merge any additional metadata from extracted
319 if metadata := extracted.get("metadata"):
320 run.extra["metadata"].update(metadata)
322 # Track first/last response inputs/outputs for trace
323 if isinstance(span.span_data, tracing.ResponseSpanData):
324 self._first_response_inputs[span.trace_id] = (
325 self._first_response_inputs.get(span.trace_id) or inputs
326 )
327 self._last_response_outputs[span.trace_id] = outputs
328 elif isinstance(span.span_data, tracing.GenerationSpanData):
329 # Use generation spans as fallback if no response spans exist
330 self._first_response_inputs[span.trace_id] = (
331 self._first_response_inputs.get(span.trace_id) or inputs
332 )
333 self._last_response_outputs[span.trace_id] = outputs
335 # End and patch
336 if span.ended_at:
337 run.end_time = datetime.fromisoformat(span.ended_at)
338 else:
339 run.end()
341 run.patch()
342 except Exception as e:
343 logger.exception(f"Error updating span run: {e}")
345 def shutdown(self) -> None:
346 self.client.flush()
348 def force_flush(self) -> None:
349 self.client.flush()