Coverage for langsmith/_internal/otel/_otel_exporter.py: 18%
395 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""OpenTelemetry exporter for LangSmith runs."""
3from __future__ import annotations
5import datetime
6import logging
7import time
8import uuid
9import warnings
10from typing import TYPE_CHECKING, Any, Optional
12if TYPE_CHECKING:
13 try:
14 from opentelemetry.context.context import Context # type: ignore[import]
15 from opentelemetry.trace import Span # type: ignore[import]
16 except ImportError:
17 Context = Any # type: ignore[assignment, misc]
18 Span = Any # type: ignore[assignment, misc]
20from langsmith import utils as ls_utils
21from langsmith._internal import _orjson
22from langsmith._internal._operations import (
23 SerializedRunOperation,
24)
25from langsmith._internal._otel_utils import (
26 get_otel_span_id_from_uuid,
27 get_otel_trace_id_from_uuid,
28)
31def _import_otel_exporter():
32 """Dynamically import OTEL exporter modules when needed."""
33 try:
34 from opentelemetry import trace # type: ignore[import]
35 from opentelemetry.context.context import Context # type: ignore[import]
36 from opentelemetry.trace import ( # type: ignore[import]
37 NonRecordingSpan,
38 Span,
39 SpanContext,
40 TraceFlags,
41 TraceState,
42 set_span_in_context,
43 )
45 return (
46 trace,
47 Context,
48 NonRecordingSpan,
49 Span,
50 SpanContext,
51 TraceFlags,
52 TraceState,
53 set_span_in_context,
54 )
55 except ImportError as e:
56 warnings.warn(
57 f"OTEL_ENABLED is set but OpenTelemetry packages are not installed: {e}"
58 )
59 return None
62logger = logging.getLogger(__name__)
64# OpenTelemetry GenAI semconv attribute names
65GEN_AI_OPERATION_NAME = "gen_ai.operation.name"
66GEN_AI_SYSTEM = "gen_ai.system"
67GEN_AI_REQUEST_MODEL = "gen_ai.request.model"
68GEN_AI_RESPONSE_MODEL = "gen_ai.response.model"
69GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens"
70GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
71GEN_AI_USAGE_TOTAL_TOKENS = "gen_ai.usage.total_tokens"
72GEN_AI_REQUEST_MAX_TOKENS = "gen_ai.request.max_tokens"
73GEN_AI_REQUEST_TEMPERATURE = "gen_ai.request.temperature"
74GEN_AI_REQUEST_TOP_P = "gen_ai.request.top_p"
75GEN_AI_REQUEST_FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty"
76GEN_AI_REQUEST_PRESENCE_PENALTY = "gen_ai.request.presence_penalty"
77GEN_AI_RESPONSE_FINISH_REASONS = "gen_ai.response.finish_reasons"
78GENAI_PROMPT = "gen_ai.prompt"
79GENAI_COMPLETION = "gen_ai.completion"
81GEN_AI_REQUEST_EXTRA_QUERY = "gen_ai.request.extra_query"
82GEN_AI_REQUEST_EXTRA_BODY = "gen_ai.request.extra_body"
83GEN_AI_SERIALIZED_NAME = "gen_ai.serialized.name"
84GEN_AI_SERIALIZED_SIGNATURE = "gen_ai.serialized.signature"
85GEN_AI_SERIALIZED_DOC = "gen_ai.serialized.doc"
86GEN_AI_RESPONSE_ID = "gen_ai.response.id"
87GEN_AI_RESPONSE_SERVICE_TIER = "gen_ai.response.service_tier"
88GEN_AI_RESPONSE_SYSTEM_FINGERPRINT = "gen_ai.response.system_fingerprint"
89GEN_AI_USAGE_INPUT_TOKEN_DETAILS = "gen_ai.usage.input_token_details"
90GEN_AI_USAGE_OUTPUT_TOKEN_DETAILS = "gen_ai.usage.output_token_details"
92# LangSmith custom attributes
93LANGSMITH_SESSION_ID = "langsmith.trace.session_id"
94LANGSMITH_SESSION_NAME = "langsmith.trace.session_name"
95LANGSMITH_RUN_TYPE = "langsmith.span.kind"
96LANGSMITH_NAME = "langsmith.trace.name"
97LANGSMITH_METADATA = "langsmith.metadata"
98LANGSMITH_TAGS = "langsmith.span.tags"
99LANGSMITH_RUNTIME = "langsmith.span.runtime"
100LANGSMITH_REQUEST_STREAMING = "langsmith.request.streaming"
101LANGSMITH_REQUEST_HEADERS = "langsmith.request.headers"
103# GenAI event names
104GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message"
105GEN_AI_USER_MESSAGE = "gen_ai.user.message"
106GEN_AI_ASSISTANT_MESSAGE = "gen_ai.assistant.message"
107GEN_AI_CHOICE = "gen_ai.choice"
109WELL_KNOWN_OPERATION_NAMES = {
110 "llm": "chat",
111 "tool": "execute_tool",
112 "retriever": "embeddings",
113 "embedding": "embeddings",
114 "prompt": "chat",
115}
118def _get_operation_name(run_type: str) -> str:
119 return WELL_KNOWN_OPERATION_NAMES.get(run_type, run_type)
122class OTELExporter:
123 __slots__ = [
124 "_tracer",
125 "_span_info",
126 "_otel_available",
127 "_trace",
128 "_span_ttl_seconds",
129 "_last_cleanup",
130 ]
131 """OpenTelemetry exporter for LangSmith runs."""
133 def __init__(self, tracer_provider=None, span_ttl_seconds=None):
134 """Initialize the OTEL exporter.
136 Args:
137 tracer_provider: Optional tracer provider to use. If not provided,
138 the global tracer provider will be used.
139 span_ttl_seconds: TTL for incomplete traces in seconds. If None,
140 uses LANGSMITH_OTEL_SPAN_TTL_SECONDS env var (default: 3600s)
141 """
142 # Set defaults from environment variables if not provided
143 if span_ttl_seconds is None:
144 span_ttl_seconds = int(
145 ls_utils.get_env_var("OTEL_SPAN_TTL_SECONDS", default="3600")
146 )
147 otel_imports = _import_otel_exporter()
148 if otel_imports is None:
149 self._tracer = None
150 self._span_info = {}
151 self._otel_available = False
152 self._trace = None
153 self._span_ttl_seconds = span_ttl_seconds
154 self._last_cleanup = 0.0
155 else:
156 (
157 trace,
158 Context,
159 NonRecordingSpan,
160 Span,
161 SpanContext,
162 TraceFlags,
163 TraceState,
164 set_span_in_context,
165 ) = otel_imports
167 self._tracer = trace.get_tracer(
168 "langsmith", tracer_provider=tracer_provider
169 )
170 self._span_info = {}
171 self._otel_available = True
172 self._trace = trace
173 self._span_ttl_seconds = span_ttl_seconds
174 self._last_cleanup = 0.0
176 def export_batch(
177 self,
178 operations: list[SerializedRunOperation],
179 otel_context_map: dict[uuid.UUID, Optional[Context]],
180 ) -> None:
181 """Export a batch of serialized run operations to OTEL.
183 Args:
184 operations: List of serialized run operations to export.
185 """
186 # Proactive cleanup of expired and excess spans before new operations
187 self._cleanup_stale_spans()
189 for op in operations:
190 try:
191 run_info = self._deserialize_run_info(op)
192 if not run_info:
193 continue
194 if op.operation == "post":
195 span = self._create_span_for_run(
196 op, run_info, otel_context_map.get(op.id)
197 )
198 if span:
199 self._span_info[op.id] = {
200 "span": span,
201 "created_at": time.time(),
202 }
203 else:
204 self._update_span_for_run(op, run_info)
205 except Exception as e:
206 logger.exception(f"Error processing operation {op.id}: {e}")
208 def _deserialize_run_info(self, op: SerializedRunOperation) -> Optional[dict]:
209 """Deserialize the run info from the operation.
211 Args:
212 op: The serialized run operation.
214 Returns:
215 The deserialized run info as a dictionary, or None if deserialization
216 failed.
217 """
218 try:
219 return op.deserialize_run_info()
220 except Exception as e:
221 logger.exception(f"Failed to deserialize run info for {op.id}: {e}")
222 return None
224 def _create_span_for_run(
225 self,
226 op: SerializedRunOperation,
227 run_info: dict,
228 otel_context: Optional[Context] = None,
229 ) -> Optional[Span]:
230 """Create an OpenTelemetry span for a run operation.
232 Args:
233 op: The serialized run operation.
234 run_info: The deserialized run info.
235 parent_span: Optional parent span.
237 Returns:
238 The created span, or None if creation failed.
239 """
240 try:
241 start_time = run_info.get("start_time")
242 start_time_utc_nano = self._as_utc_nano(start_time)
244 end_time = run_info.get("end_time")
245 end_time_utc_nano = self._as_utc_nano(end_time)
247 # Create deterministic trace and span IDs to match user OpenTelemetry spans
248 trace_id_int = get_otel_trace_id_from_uuid(op.trace_id)
249 span_id_int = get_otel_span_id_from_uuid(op.id)
251 # Get OTEL imports for this operation
252 otel_imports = _import_otel_exporter()
253 if otel_imports is None:
254 return None
255 (
256 trace,
257 Context,
258 NonRecordingSpan,
259 Span,
260 SpanContext,
261 TraceFlags,
262 TraceState,
263 set_span_in_context,
264 ) = otel_imports
266 # Create SpanContext with deterministic IDs
267 span_context = SpanContext(
268 trace_id=trace_id_int,
269 span_id=span_id_int,
270 is_remote=False,
271 trace_flags=TraceFlags(TraceFlags.SAMPLED),
272 trace_state=TraceState(),
273 )
275 # Create NonRecordingSpan for context setting
276 non_recording_span = NonRecordingSpan(span_context)
277 deterministic_context = set_span_in_context(non_recording_span)
279 # Start the span with appropriate context
280 parent_run_id = run_info.get("parent_run_id")
281 if (
282 parent_run_id is not None
283 and uuid.UUID(parent_run_id) in self._span_info
284 ):
285 # Use the parent span context
286 parent_span = self._span_info[uuid.UUID(parent_run_id)]["span"]
287 span = self._tracer.start_span(
288 run_info.get("name"),
289 context=set_span_in_context(parent_span),
290 start_time=start_time_utc_nano,
291 )
292 else:
293 # For root spans, check if there's an existing OpenTelemetry context
294 # If so, inherit from it; otherwise use our deterministic context
295 current_context = (
296 otel_context if otel_context else deterministic_context
297 )
298 span = self._tracer.start_span(
299 run_info.get("name"),
300 context=current_context,
301 start_time=start_time_utc_nano,
302 )
304 # Set all attributes
305 self._set_span_attributes(span, run_info, op)
307 # Set status based on error
308 if run_info.get("error"):
309 span.set_status(trace.StatusCode.ERROR)
310 span.record_exception(Exception(run_info.get("error")))
311 else:
312 span.set_status(trace.StatusCode.OK)
314 # End the span if end_time is present
315 end_time = run_info.get("end_time")
316 if end_time:
317 end_time_utc_nano = self._as_utc_nano(end_time)
318 if end_time_utc_nano:
319 span.end(end_time=end_time_utc_nano)
320 else:
321 span.end()
323 return span
324 except Exception as e:
325 logger.exception(f"Failed to create span for run {op.id}: {e}")
326 return None
328 def _update_span_for_run(self, op: SerializedRunOperation, run_info: dict) -> None:
329 """Update an OpenTelemetry span for a run operation.
331 Args:
332 op: The serialized run operation.
333 run_info: The deserialized run info.
334 """
335 try:
336 # Get the span for this run
337 if op.id not in self._span_info:
338 logger.debug(f"No span found for run {op.id} during update")
339 return
341 span = self._span_info[op.id]["span"]
343 # Update attributes
344 self._set_span_attributes(span, run_info, op)
345 # Update status based on error
346 if run_info.get("error"):
347 span.set_status(self._trace.StatusCode.ERROR)
348 span.record_exception(Exception(run_info.get("error")))
349 else:
350 span.set_status(self._trace.StatusCode.OK)
352 # End the span if end_time is present
353 end_time = run_info.get("end_time")
354 if end_time:
355 end_time_utc_nano = self._as_utc_nano(end_time)
356 if end_time_utc_nano:
357 span.end(end_time=end_time_utc_nano)
358 else:
359 span.end()
360 # Remove the span info from our dictionary
361 del self._span_info[op.id]
362 logger.debug(f"Completed span, remaining spans: {len(self._span_info)}")
363 else:
364 # Span exists but no end_time - this is normal for ongoing operations
365 logger.debug("Updated span (no end_time yet)")
367 except Exception as e:
368 logger.exception(f"Failed to update span for run {op.id}: {e}")
370 def _cleanup_stale_spans(self) -> None:
371 """Clean up spans older than TTL threshold."""
372 if not self._span_info:
373 return
375 current_time = time.time()
377 # Only run cleanup every 10 seconds to reduce overhead
378 if current_time - self._last_cleanup < 10.0:
379 return
381 self._last_cleanup = current_time
382 cutoff_time = current_time - self._span_ttl_seconds
384 # Remove spans older than TTL in one pass
385 stale_span_ids = [
386 span_id
387 for span_id, info in self._span_info.items()
388 if info["created_at"] < cutoff_time
389 ]
391 if stale_span_ids:
392 logger.info(
393 f" LangSmith OTEL Cleanup: Removing {len(stale_span_ids)} stale spans"
394 )
396 for span_id in stale_span_ids:
397 self._remove_span(span_id)
399 def _remove_span(self, span_id: uuid.UUID) -> None:
400 """Remove a single span and clean up resources.
402 Note:
403 We call `span.end()` here because spans in `_span_info` are orphaned -
404 they never received their patch operation and will never naturally complete.
406 Ending them gracefully is better than leaving them open indefinitely.
407 """
408 if span_id not in self._span_info:
409 return
411 try:
412 # End the orphaned span gracefully
413 span = self._span_info[span_id]["span"]
415 # Check if span is still active before ending it
416 if (
417 hasattr(span, "end")
418 and hasattr(span, "is_recording")
419 and span.is_recording()
420 ):
421 span.end()
422 logger.debug(f"Ended orphaned span {span_id}")
423 elif hasattr(span, "end"):
424 # Span already ended, just log it
425 logger.debug(f"Span {span_id} already ended, skipping end() call")
427 # Remove from tracking regardless
428 del self._span_info[span_id]
430 except Exception as e:
431 logger.debug(f"Error removing span {span_id}: {e}")
432 # Still try to remove from tracking even if ending failed
433 try:
434 del self._span_info[span_id]
435 except KeyError:
436 pass
438 def _extract_model_name(self, run_info: dict) -> Optional[str]:
439 """Extract model name from run info.
441 Args:
442 run_info: The run info.
444 Returns:
445 The model name, or None if not found.
446 """
447 # Try to get model name from metadata
448 if run_info.get("extra") and run_info["extra"].get("metadata"):
449 metadata = run_info["extra"]["metadata"]
451 # First check for ls_model_name in metadata
452 if metadata.get("ls_model_name"):
453 return metadata["ls_model_name"]
455 # Then check invocation_params for model info
456 if "invocation_params" in metadata:
457 invocation_params = metadata["invocation_params"]
458 # Check model first, then model_name
459 if invocation_params.get("model"):
460 return invocation_params["model"]
461 elif invocation_params.get("model_name"):
462 return invocation_params["model_name"]
464 return None
466 def _set_span_attributes(
467 self,
468 span: Span,
469 run_info: dict,
470 op: SerializedRunOperation,
471 ) -> None:
472 """Set attributes on the span.
474 Args:
475 span: The span to set attributes on.
476 run_info: The deserialized run info.
477 op: The serialized run operation.
478 """
479 # Set LangSmith-specific attributes
480 if run_info.get("run_type"):
481 span.set_attribute(LANGSMITH_RUN_TYPE, str(run_info.get("run_type")))
483 if run_info.get("name"):
484 span.set_attribute(LANGSMITH_NAME, str(run_info.get("name")))
486 if run_info.get("session_id"):
487 span.set_attribute(LANGSMITH_SESSION_ID, str(run_info.get("session_id")))
488 if run_info.get("session_name"):
489 span.set_attribute(
490 LANGSMITH_SESSION_NAME, str(run_info.get("session_name"))
491 )
493 # Set GenAI attributes according to OTEL semantic conventions
494 # Set gen_ai.operation.name
495 if op.operation == "post":
496 operation_name = _get_operation_name(run_info.get("run_type", "chain"))
497 span.set_attribute(GEN_AI_OPERATION_NAME, operation_name)
499 # Set gen_ai.system
500 self._set_gen_ai_system(span, run_info)
502 # Set model name if available
503 model_name = self._extract_model_name(run_info)
504 if model_name:
505 span.set_attribute(GEN_AI_REQUEST_MODEL, model_name)
507 # Set token usage information
508 if run_info.get("prompt_tokens") is not None:
509 prompt_tokens = run_info["prompt_tokens"]
510 span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, int(prompt_tokens))
512 if run_info.get("completion_tokens") is not None:
513 completion_tokens = run_info["completion_tokens"]
514 span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, int(completion_tokens))
516 if run_info.get("total_tokens") is not None:
517 total_tokens = run_info["total_tokens"]
518 span.set_attribute(GEN_AI_USAGE_TOTAL_TOKENS, int(total_tokens))
520 # Set other parameters from invocation_params
521 self._set_invocation_parameters(span, run_info)
523 # Set metadata and tags if available
524 extra = run_info.get("extra", {})
525 metadata = extra.get("metadata", {})
526 for key, value in metadata.items():
527 if value is not None:
528 span.set_attribute(f"{LANGSMITH_METADATA}.{key}", value)
530 tags = run_info.get("tags")
531 if tags:
532 if isinstance(tags, list):
533 span.set_attribute(LANGSMITH_TAGS, ", ".join(tags))
534 else:
535 span.set_attribute(LANGSMITH_TAGS, tags)
537 # Support additional serialized attributes, if present
538 if run_info.get("serialized") and isinstance(run_info["serialized"], dict):
539 serialized = run_info["serialized"]
540 if "name" in serialized and serialized["name"] is not None:
541 span.set_attribute(GEN_AI_SERIALIZED_NAME, serialized["name"])
542 if "signature" in serialized and serialized["signature"] is not None:
543 span.set_attribute(GEN_AI_SERIALIZED_SIGNATURE, serialized["signature"])
544 if "doc" in serialized and serialized["doc"] is not None:
545 span.set_attribute(GEN_AI_SERIALIZED_DOC, serialized["doc"])
547 # Set inputs/outputs if available
548 self._set_io_attributes(span, op)
550 def _set_gen_ai_system(self, span: Span, run_info: dict) -> None:
551 """Set the gen_ai.system attribute on the span based on the model provider.
553 Args:
554 span: The span to set attributes on.
555 run_info: The deserialized run info.
556 """
557 # Default to "langchain" if we can't determine the system
558 system = "langchain"
560 # Extract model name to determine the system
561 model_name = self._extract_model_name(run_info)
562 if model_name:
563 model_lower = model_name.lower()
564 if "anthropic" in model_lower or model_lower.startswith("claude"):
565 system = "anthropic"
566 elif "bedrock" in model_lower:
567 system = "aws.bedrock"
568 elif "azure" in model_lower and "openai" in model_lower:
569 system = "az.ai.openai"
570 elif "azure" in model_lower and "inference" in model_lower:
571 system = "az.ai.inference"
572 elif "cohere" in model_lower:
573 system = "cohere"
574 elif "deepseek" in model_lower:
575 system = "deepseek"
576 elif "gemini" in model_lower:
577 system = "gemini"
578 elif "groq" in model_lower:
579 system = "groq"
580 elif "watson" in model_lower or "ibm" in model_lower:
581 system = "ibm.watsonx.ai"
582 elif "mistral" in model_lower:
583 system = "mistral_ai"
584 elif "gpt" in model_lower or "openai" in model_lower:
585 system = "openai"
586 elif "perplexity" in model_lower or "sonar" in model_lower:
587 system = "perplexity"
588 elif "vertex" in model_lower:
589 system = "vertex_ai"
590 elif "xai" in model_lower or "grok" in model_lower:
591 system = "xai"
592 elif "qwen" in model_lower:
593 system = "qwen"
595 span.set_attribute(GEN_AI_SYSTEM, system)
596 setattr(span, "_gen_ai_system", system)
598 def _set_invocation_parameters(self, span: Span, run_info: dict) -> None:
599 """Set invocation parameters on the span.
601 Args:
602 span: The span to set attributes on.
603 run_info: The deserialized run info.
604 """
605 if not (run_info.get("extra") and run_info["extra"].get("metadata")):
606 return
608 metadata = run_info["extra"]["metadata"]
609 if "invocation_params" not in metadata:
610 return
612 invocation_params = metadata["invocation_params"]
614 # Set relevant invocation parameters
615 if "max_tokens" in invocation_params:
616 span.set_attribute(
617 GEN_AI_REQUEST_MAX_TOKENS, invocation_params["max_tokens"]
618 )
620 if "temperature" in invocation_params:
621 span.set_attribute(
622 GEN_AI_REQUEST_TEMPERATURE, invocation_params["temperature"]
623 )
625 if "top_p" in invocation_params:
626 span.set_attribute(GEN_AI_REQUEST_TOP_P, invocation_params["top_p"])
628 if "frequency_penalty" in invocation_params:
629 span.set_attribute(
630 GEN_AI_REQUEST_FREQUENCY_PENALTY, invocation_params["frequency_penalty"]
631 )
633 if "presence_penalty" in invocation_params:
634 span.set_attribute(
635 GEN_AI_REQUEST_PRESENCE_PENALTY, invocation_params["presence_penalty"]
636 )
638 def _set_io_attributes(self, span: Span, op: SerializedRunOperation) -> None:
639 """Set input/output attributes on the span.
641 Args:
642 span: The span to set attributes on.
643 op: The serialized run operation.
644 """
645 if op.inputs:
646 try:
647 inputs = _orjson.loads(op.inputs)
649 if isinstance(inputs, dict):
650 if (
651 "model" in inputs
652 and isinstance(inputs.get("messages"), list)
653 and inputs["model"] is not None
654 ):
655 span.set_attribute(GEN_AI_REQUEST_MODEL, inputs["model"])
657 # Set additional request attributes if available.
658 if "stream" in inputs and inputs["stream"] is not None:
659 span.set_attribute(
660 LANGSMITH_REQUEST_STREAMING, inputs["stream"]
661 )
662 if (
663 "extra_headers" in inputs
664 and inputs["extra_headers"] is not None
665 ):
666 span.set_attribute(
667 LANGSMITH_REQUEST_HEADERS, inputs["extra_headers"]
668 )
669 if "extra_query" in inputs and inputs["extra_query"] is not None:
670 span.set_attribute(
671 GEN_AI_REQUEST_EXTRA_QUERY, inputs["extra_query"]
672 )
673 if "extra_body" in inputs and inputs["extra_body"] is not None:
674 span.set_attribute(
675 GEN_AI_REQUEST_EXTRA_BODY, inputs["extra_body"]
676 )
678 span.set_attribute(GENAI_PROMPT, op.inputs)
680 except Exception:
681 logger.debug(
682 "Failed to process inputs for run %s", op.id, exc_info=True
683 )
685 if op.outputs:
686 try:
687 outputs = _orjson.loads(op.outputs)
689 # Extract token usage from outputs (for LLM runs)
690 token_usage = self.get_unified_run_tokens(outputs)
691 if token_usage:
692 span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, token_usage[0])
693 span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, token_usage[1])
694 span.set_attribute(
695 GEN_AI_USAGE_TOTAL_TOKENS, token_usage[0] + token_usage[1]
696 )
698 if "model" in outputs:
699 span.set_attribute(GEN_AI_RESPONSE_MODEL, str(outputs["model"]))
700 # Extract additional response attributes.
701 if isinstance(outputs, dict):
702 if "id" in outputs and outputs["id"] is not None:
703 span.set_attribute(GEN_AI_RESPONSE_ID, outputs["id"])
704 if "choices" in outputs and isinstance(outputs["choices"], list):
705 finish_reasons = []
706 for choice in outputs["choices"]:
707 if (
708 "finish_reason" in choice
709 and choice["finish_reason"] is not None
710 ):
711 finish_reasons.append(str(choice["finish_reason"]))
712 if finish_reasons:
713 span.set_attribute(
714 GEN_AI_RESPONSE_FINISH_REASONS,
715 ", ".join(finish_reasons),
716 )
717 if (
718 "service_tier" in outputs
719 and outputs["service_tier"] is not None
720 ):
721 span.set_attribute(
722 GEN_AI_RESPONSE_SERVICE_TIER, outputs["service_tier"]
723 )
724 if (
725 "system_fingerprint" in outputs
726 and outputs["system_fingerprint"] is not None
727 ):
728 span.set_attribute(
729 GEN_AI_RESPONSE_SYSTEM_FINGERPRINT,
730 outputs["system_fingerprint"],
731 )
732 if "usage_metadata" in outputs and isinstance(
733 outputs["usage_metadata"], dict
734 ):
735 usage_metadata = outputs["usage_metadata"]
736 if (
737 "input_token_details" in usage_metadata
738 and usage_metadata["input_token_details"] is not None
739 ):
740 input_token_details = str(
741 usage_metadata["input_token_details"]
742 )
743 span.set_attribute(
744 GEN_AI_USAGE_INPUT_TOKEN_DETAILS, input_token_details
745 )
746 if (
747 "output_token_details" in usage_metadata
748 and usage_metadata["output_token_details"] is not None
749 ):
750 output_token_details = str(
751 usage_metadata["output_token_details"]
752 )
753 span.set_attribute(
754 GEN_AI_USAGE_OUTPUT_TOKEN_DETAILS, output_token_details
755 )
757 span.set_attribute(GENAI_COMPLETION, op.outputs)
759 except Exception:
760 logger.debug(
761 "Failed to process outputs for run %s", op.id, exc_info=True
762 )
764 def _as_utc_nano(self, timestamp: Optional[str]) -> Optional[int]:
765 if not timestamp:
766 return None
767 try:
768 dt = datetime.datetime.fromisoformat(timestamp)
769 return int(dt.astimezone(datetime.timezone.utc).timestamp() * 1_000_000_000)
770 except ValueError:
771 logger.exception(f"Failed to parse timestamp {timestamp}")
772 return None
774 def get_unified_run_tokens(
775 self, outputs: Optional[dict]
776 ) -> Optional[tuple[int, int]]:
777 if not outputs:
778 return None
780 # search in non-generations lists
781 if output := self._extract_unified_run_tokens(outputs.get("usage_metadata")):
782 return output
784 # find if direct kwarg in outputs
785 keys = outputs.keys()
786 for key in keys:
787 haystack = outputs[key]
788 if not haystack or not isinstance(haystack, dict):
789 continue
791 if output := self._extract_unified_run_tokens(
792 haystack.get("usage_metadata")
793 ):
794 return output
796 if (
797 haystack.get("lc") == 1
798 and "kwargs" in haystack
799 and isinstance(haystack["kwargs"], dict)
800 and (
801 output := self._extract_unified_run_tokens(
802 haystack["kwargs"].get("usage_metadata")
803 )
804 )
805 ):
806 return output
808 # find in generations
809 generations = outputs.get("generations") or []
810 if not isinstance(generations, list):
811 return None
812 if generations and not isinstance(generations[0], list):
813 generations = [generations]
815 for generation in [x for xs in generations for x in xs]:
816 if (
817 isinstance(generation, dict)
818 and "message" in generation
819 and isinstance(generation["message"], dict)
820 and "kwargs" in generation["message"]
821 and isinstance(generation["message"]["kwargs"], dict)
822 and (
823 output := self._extract_unified_run_tokens(
824 generation["message"]["kwargs"].get("usage_metadata")
825 )
826 )
827 ):
828 return output
829 return None
831 def _extract_unified_run_tokens(
832 self, outputs: Optional[Any]
833 ) -> Optional[tuple[int, int]]:
834 if not outputs or not isinstance(outputs, dict):
835 return None
837 if "input_tokens" not in outputs or "output_tokens" not in outputs:
838 return None
840 if not isinstance(outputs["input_tokens"], int) or not isinstance(
841 outputs["output_tokens"], int
842 ):
843 return None
845 return outputs["input_tokens"], outputs["output_tokens"]