Coverage for langsmith/_internal/otel/_otel_exporter.py: 18%

395 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""OpenTelemetry exporter for LangSmith runs.""" 

2 

3from __future__ import annotations 

4 

5import datetime 

6import logging 

7import time 

8import uuid 

9import warnings 

10from typing import TYPE_CHECKING, Any, Optional 

11 

12if TYPE_CHECKING: 

13 try: 

14 from opentelemetry.context.context import Context # type: ignore[import] 

15 from opentelemetry.trace import Span # type: ignore[import] 

16 except ImportError: 

17 Context = Any # type: ignore[assignment, misc] 

18 Span = Any # type: ignore[assignment, misc] 

19 

20from langsmith import utils as ls_utils 

21from langsmith._internal import _orjson 

22from langsmith._internal._operations import ( 

23 SerializedRunOperation, 

24) 

25from langsmith._internal._otel_utils import ( 

26 get_otel_span_id_from_uuid, 

27 get_otel_trace_id_from_uuid, 

28) 

29 

30 

31def _import_otel_exporter(): 

32 """Dynamically import OTEL exporter modules when needed.""" 

33 try: 

34 from opentelemetry import trace # type: ignore[import] 

35 from opentelemetry.context.context import Context # type: ignore[import] 

36 from opentelemetry.trace import ( # type: ignore[import] 

37 NonRecordingSpan, 

38 Span, 

39 SpanContext, 

40 TraceFlags, 

41 TraceState, 

42 set_span_in_context, 

43 ) 

44 

45 return ( 

46 trace, 

47 Context, 

48 NonRecordingSpan, 

49 Span, 

50 SpanContext, 

51 TraceFlags, 

52 TraceState, 

53 set_span_in_context, 

54 ) 

55 except ImportError as e: 

56 warnings.warn( 

57 f"OTEL_ENABLED is set but OpenTelemetry packages are not installed: {e}" 

58 ) 

59 return None 

60 

61 

62logger = logging.getLogger(__name__) 

63 

64# OpenTelemetry GenAI semconv attribute names 

65GEN_AI_OPERATION_NAME = "gen_ai.operation.name" 

66GEN_AI_SYSTEM = "gen_ai.system" 

67GEN_AI_REQUEST_MODEL = "gen_ai.request.model" 

68GEN_AI_RESPONSE_MODEL = "gen_ai.response.model" 

69GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens" 

70GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens" 

71GEN_AI_USAGE_TOTAL_TOKENS = "gen_ai.usage.total_tokens" 

72GEN_AI_REQUEST_MAX_TOKENS = "gen_ai.request.max_tokens" 

73GEN_AI_REQUEST_TEMPERATURE = "gen_ai.request.temperature" 

74GEN_AI_REQUEST_TOP_P = "gen_ai.request.top_p" 

75GEN_AI_REQUEST_FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty" 

76GEN_AI_REQUEST_PRESENCE_PENALTY = "gen_ai.request.presence_penalty" 

77GEN_AI_RESPONSE_FINISH_REASONS = "gen_ai.response.finish_reasons" 

78GENAI_PROMPT = "gen_ai.prompt" 

79GENAI_COMPLETION = "gen_ai.completion" 

80 

81GEN_AI_REQUEST_EXTRA_QUERY = "gen_ai.request.extra_query" 

82GEN_AI_REQUEST_EXTRA_BODY = "gen_ai.request.extra_body" 

83GEN_AI_SERIALIZED_NAME = "gen_ai.serialized.name" 

84GEN_AI_SERIALIZED_SIGNATURE = "gen_ai.serialized.signature" 

85GEN_AI_SERIALIZED_DOC = "gen_ai.serialized.doc" 

86GEN_AI_RESPONSE_ID = "gen_ai.response.id" 

87GEN_AI_RESPONSE_SERVICE_TIER = "gen_ai.response.service_tier" 

88GEN_AI_RESPONSE_SYSTEM_FINGERPRINT = "gen_ai.response.system_fingerprint" 

89GEN_AI_USAGE_INPUT_TOKEN_DETAILS = "gen_ai.usage.input_token_details" 

90GEN_AI_USAGE_OUTPUT_TOKEN_DETAILS = "gen_ai.usage.output_token_details" 

91 

92# LangSmith custom attributes 

93LANGSMITH_SESSION_ID = "langsmith.trace.session_id" 

94LANGSMITH_SESSION_NAME = "langsmith.trace.session_name" 

95LANGSMITH_RUN_TYPE = "langsmith.span.kind" 

96LANGSMITH_NAME = "langsmith.trace.name" 

97LANGSMITH_METADATA = "langsmith.metadata" 

98LANGSMITH_TAGS = "langsmith.span.tags" 

99LANGSMITH_RUNTIME = "langsmith.span.runtime" 

100LANGSMITH_REQUEST_STREAMING = "langsmith.request.streaming" 

101LANGSMITH_REQUEST_HEADERS = "langsmith.request.headers" 

102 

103# GenAI event names 

104GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message" 

105GEN_AI_USER_MESSAGE = "gen_ai.user.message" 

106GEN_AI_ASSISTANT_MESSAGE = "gen_ai.assistant.message" 

107GEN_AI_CHOICE = "gen_ai.choice" 

108 

109WELL_KNOWN_OPERATION_NAMES = { 

110 "llm": "chat", 

111 "tool": "execute_tool", 

112 "retriever": "embeddings", 

113 "embedding": "embeddings", 

114 "prompt": "chat", 

115} 

116 

117 

118def _get_operation_name(run_type: str) -> str: 

119 return WELL_KNOWN_OPERATION_NAMES.get(run_type, run_type) 

120 

121 

122class OTELExporter: 

123 __slots__ = [ 

124 "_tracer", 

125 "_span_info", 

126 "_otel_available", 

127 "_trace", 

128 "_span_ttl_seconds", 

129 "_last_cleanup", 

130 ] 

131 """OpenTelemetry exporter for LangSmith runs.""" 

132 

133 def __init__(self, tracer_provider=None, span_ttl_seconds=None): 

134 """Initialize the OTEL exporter. 

135 

136 Args: 

137 tracer_provider: Optional tracer provider to use. If not provided, 

138 the global tracer provider will be used. 

139 span_ttl_seconds: TTL for incomplete traces in seconds. If None, 

140 uses LANGSMITH_OTEL_SPAN_TTL_SECONDS env var (default: 3600s) 

141 """ 

142 # Set defaults from environment variables if not provided 

143 if span_ttl_seconds is None: 

144 span_ttl_seconds = int( 

145 ls_utils.get_env_var("OTEL_SPAN_TTL_SECONDS", default="3600") 

146 ) 

147 otel_imports = _import_otel_exporter() 

148 if otel_imports is None: 

149 self._tracer = None 

150 self._span_info = {} 

151 self._otel_available = False 

152 self._trace = None 

153 self._span_ttl_seconds = span_ttl_seconds 

154 self._last_cleanup = 0.0 

155 else: 

156 ( 

157 trace, 

158 Context, 

159 NonRecordingSpan, 

160 Span, 

161 SpanContext, 

162 TraceFlags, 

163 TraceState, 

164 set_span_in_context, 

165 ) = otel_imports 

166 

167 self._tracer = trace.get_tracer( 

168 "langsmith", tracer_provider=tracer_provider 

169 ) 

170 self._span_info = {} 

171 self._otel_available = True 

172 self._trace = trace 

173 self._span_ttl_seconds = span_ttl_seconds 

174 self._last_cleanup = 0.0 

175 

176 def export_batch( 

177 self, 

178 operations: list[SerializedRunOperation], 

179 otel_context_map: dict[uuid.UUID, Optional[Context]], 

180 ) -> None: 

181 """Export a batch of serialized run operations to OTEL. 

182 

183 Args: 

184 operations: List of serialized run operations to export. 

185 """ 

186 # Proactive cleanup of expired and excess spans before new operations 

187 self._cleanup_stale_spans() 

188 

189 for op in operations: 

190 try: 

191 run_info = self._deserialize_run_info(op) 

192 if not run_info: 

193 continue 

194 if op.operation == "post": 

195 span = self._create_span_for_run( 

196 op, run_info, otel_context_map.get(op.id) 

197 ) 

198 if span: 

199 self._span_info[op.id] = { 

200 "span": span, 

201 "created_at": time.time(), 

202 } 

203 else: 

204 self._update_span_for_run(op, run_info) 

205 except Exception as e: 

206 logger.exception(f"Error processing operation {op.id}: {e}") 

207 

208 def _deserialize_run_info(self, op: SerializedRunOperation) -> Optional[dict]: 

209 """Deserialize the run info from the operation. 

210 

211 Args: 

212 op: The serialized run operation. 

213 

214 Returns: 

215 The deserialized run info as a dictionary, or None if deserialization 

216 failed. 

217 """ 

218 try: 

219 return op.deserialize_run_info() 

220 except Exception as e: 

221 logger.exception(f"Failed to deserialize run info for {op.id}: {e}") 

222 return None 

223 

224 def _create_span_for_run( 

225 self, 

226 op: SerializedRunOperation, 

227 run_info: dict, 

228 otel_context: Optional[Context] = None, 

229 ) -> Optional[Span]: 

230 """Create an OpenTelemetry span for a run operation. 

231 

232 Args: 

233 op: The serialized run operation. 

234 run_info: The deserialized run info. 

235 parent_span: Optional parent span. 

236 

237 Returns: 

238 The created span, or None if creation failed. 

239 """ 

240 try: 

241 start_time = run_info.get("start_time") 

242 start_time_utc_nano = self._as_utc_nano(start_time) 

243 

244 end_time = run_info.get("end_time") 

245 end_time_utc_nano = self._as_utc_nano(end_time) 

246 

247 # Create deterministic trace and span IDs to match user OpenTelemetry spans 

248 trace_id_int = get_otel_trace_id_from_uuid(op.trace_id) 

249 span_id_int = get_otel_span_id_from_uuid(op.id) 

250 

251 # Get OTEL imports for this operation 

252 otel_imports = _import_otel_exporter() 

253 if otel_imports is None: 

254 return None 

255 ( 

256 trace, 

257 Context, 

258 NonRecordingSpan, 

259 Span, 

260 SpanContext, 

261 TraceFlags, 

262 TraceState, 

263 set_span_in_context, 

264 ) = otel_imports 

265 

266 # Create SpanContext with deterministic IDs 

267 span_context = SpanContext( 

268 trace_id=trace_id_int, 

269 span_id=span_id_int, 

270 is_remote=False, 

271 trace_flags=TraceFlags(TraceFlags.SAMPLED), 

272 trace_state=TraceState(), 

273 ) 

274 

275 # Create NonRecordingSpan for context setting 

276 non_recording_span = NonRecordingSpan(span_context) 

277 deterministic_context = set_span_in_context(non_recording_span) 

278 

279 # Start the span with appropriate context 

280 parent_run_id = run_info.get("parent_run_id") 

281 if ( 

282 parent_run_id is not None 

283 and uuid.UUID(parent_run_id) in self._span_info 

284 ): 

285 # Use the parent span context 

286 parent_span = self._span_info[uuid.UUID(parent_run_id)]["span"] 

287 span = self._tracer.start_span( 

288 run_info.get("name"), 

289 context=set_span_in_context(parent_span), 

290 start_time=start_time_utc_nano, 

291 ) 

292 else: 

293 # For root spans, check if there's an existing OpenTelemetry context 

294 # If so, inherit from it; otherwise use our deterministic context 

295 current_context = ( 

296 otel_context if otel_context else deterministic_context 

297 ) 

298 span = self._tracer.start_span( 

299 run_info.get("name"), 

300 context=current_context, 

301 start_time=start_time_utc_nano, 

302 ) 

303 

304 # Set all attributes 

305 self._set_span_attributes(span, run_info, op) 

306 

307 # Set status based on error 

308 if run_info.get("error"): 

309 span.set_status(trace.StatusCode.ERROR) 

310 span.record_exception(Exception(run_info.get("error"))) 

311 else: 

312 span.set_status(trace.StatusCode.OK) 

313 

314 # End the span if end_time is present 

315 end_time = run_info.get("end_time") 

316 if end_time: 

317 end_time_utc_nano = self._as_utc_nano(end_time) 

318 if end_time_utc_nano: 

319 span.end(end_time=end_time_utc_nano) 

320 else: 

321 span.end() 

322 

323 return span 

324 except Exception as e: 

325 logger.exception(f"Failed to create span for run {op.id}: {e}") 

326 return None 

327 

328 def _update_span_for_run(self, op: SerializedRunOperation, run_info: dict) -> None: 

329 """Update an OpenTelemetry span for a run operation. 

330 

331 Args: 

332 op: The serialized run operation. 

333 run_info: The deserialized run info. 

334 """ 

335 try: 

336 # Get the span for this run 

337 if op.id not in self._span_info: 

338 logger.debug(f"No span found for run {op.id} during update") 

339 return 

340 

341 span = self._span_info[op.id]["span"] 

342 

343 # Update attributes 

344 self._set_span_attributes(span, run_info, op) 

345 # Update status based on error 

346 if run_info.get("error"): 

347 span.set_status(self._trace.StatusCode.ERROR) 

348 span.record_exception(Exception(run_info.get("error"))) 

349 else: 

350 span.set_status(self._trace.StatusCode.OK) 

351 

352 # End the span if end_time is present 

353 end_time = run_info.get("end_time") 

354 if end_time: 

355 end_time_utc_nano = self._as_utc_nano(end_time) 

356 if end_time_utc_nano: 

357 span.end(end_time=end_time_utc_nano) 

358 else: 

359 span.end() 

360 # Remove the span info from our dictionary 

361 del self._span_info[op.id] 

362 logger.debug(f"Completed span, remaining spans: {len(self._span_info)}") 

363 else: 

364 # Span exists but no end_time - this is normal for ongoing operations 

365 logger.debug("Updated span (no end_time yet)") 

366 

367 except Exception as e: 

368 logger.exception(f"Failed to update span for run {op.id}: {e}") 

369 

370 def _cleanup_stale_spans(self) -> None: 

371 """Clean up spans older than TTL threshold.""" 

372 if not self._span_info: 

373 return 

374 

375 current_time = time.time() 

376 

377 # Only run cleanup every 10 seconds to reduce overhead 

378 if current_time - self._last_cleanup < 10.0: 

379 return 

380 

381 self._last_cleanup = current_time 

382 cutoff_time = current_time - self._span_ttl_seconds 

383 

384 # Remove spans older than TTL in one pass 

385 stale_span_ids = [ 

386 span_id 

387 for span_id, info in self._span_info.items() 

388 if info["created_at"] < cutoff_time 

389 ] 

390 

391 if stale_span_ids: 

392 logger.info( 

393 f" LangSmith OTEL Cleanup: Removing {len(stale_span_ids)} stale spans" 

394 ) 

395 

396 for span_id in stale_span_ids: 

397 self._remove_span(span_id) 

398 

399 def _remove_span(self, span_id: uuid.UUID) -> None: 

400 """Remove a single span and clean up resources. 

401 

402 Note: 

403 We call `span.end()` here because spans in `_span_info` are orphaned - 

404 they never received their patch operation and will never naturally complete. 

405 

406 Ending them gracefully is better than leaving them open indefinitely. 

407 """ 

408 if span_id not in self._span_info: 

409 return 

410 

411 try: 

412 # End the orphaned span gracefully 

413 span = self._span_info[span_id]["span"] 

414 

415 # Check if span is still active before ending it 

416 if ( 

417 hasattr(span, "end") 

418 and hasattr(span, "is_recording") 

419 and span.is_recording() 

420 ): 

421 span.end() 

422 logger.debug(f"Ended orphaned span {span_id}") 

423 elif hasattr(span, "end"): 

424 # Span already ended, just log it 

425 logger.debug(f"Span {span_id} already ended, skipping end() call") 

426 

427 # Remove from tracking regardless 

428 del self._span_info[span_id] 

429 

430 except Exception as e: 

431 logger.debug(f"Error removing span {span_id}: {e}") 

432 # Still try to remove from tracking even if ending failed 

433 try: 

434 del self._span_info[span_id] 

435 except KeyError: 

436 pass 

437 

438 def _extract_model_name(self, run_info: dict) -> Optional[str]: 

439 """Extract model name from run info. 

440 

441 Args: 

442 run_info: The run info. 

443 

444 Returns: 

445 The model name, or None if not found. 

446 """ 

447 # Try to get model name from metadata 

448 if run_info.get("extra") and run_info["extra"].get("metadata"): 

449 metadata = run_info["extra"]["metadata"] 

450 

451 # First check for ls_model_name in metadata 

452 if metadata.get("ls_model_name"): 

453 return metadata["ls_model_name"] 

454 

455 # Then check invocation_params for model info 

456 if "invocation_params" in metadata: 

457 invocation_params = metadata["invocation_params"] 

458 # Check model first, then model_name 

459 if invocation_params.get("model"): 

460 return invocation_params["model"] 

461 elif invocation_params.get("model_name"): 

462 return invocation_params["model_name"] 

463 

464 return None 

465 

466 def _set_span_attributes( 

467 self, 

468 span: Span, 

469 run_info: dict, 

470 op: SerializedRunOperation, 

471 ) -> None: 

472 """Set attributes on the span. 

473 

474 Args: 

475 span: The span to set attributes on. 

476 run_info: The deserialized run info. 

477 op: The serialized run operation. 

478 """ 

479 # Set LangSmith-specific attributes 

480 if run_info.get("run_type"): 

481 span.set_attribute(LANGSMITH_RUN_TYPE, str(run_info.get("run_type"))) 

482 

483 if run_info.get("name"): 

484 span.set_attribute(LANGSMITH_NAME, str(run_info.get("name"))) 

485 

486 if run_info.get("session_id"): 

487 span.set_attribute(LANGSMITH_SESSION_ID, str(run_info.get("session_id"))) 

488 if run_info.get("session_name"): 

489 span.set_attribute( 

490 LANGSMITH_SESSION_NAME, str(run_info.get("session_name")) 

491 ) 

492 

493 # Set GenAI attributes according to OTEL semantic conventions 

494 # Set gen_ai.operation.name 

495 if op.operation == "post": 

496 operation_name = _get_operation_name(run_info.get("run_type", "chain")) 

497 span.set_attribute(GEN_AI_OPERATION_NAME, operation_name) 

498 

499 # Set gen_ai.system 

500 self._set_gen_ai_system(span, run_info) 

501 

502 # Set model name if available 

503 model_name = self._extract_model_name(run_info) 

504 if model_name: 

505 span.set_attribute(GEN_AI_REQUEST_MODEL, model_name) 

506 

507 # Set token usage information 

508 if run_info.get("prompt_tokens") is not None: 

509 prompt_tokens = run_info["prompt_tokens"] 

510 span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, int(prompt_tokens)) 

511 

512 if run_info.get("completion_tokens") is not None: 

513 completion_tokens = run_info["completion_tokens"] 

514 span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, int(completion_tokens)) 

515 

516 if run_info.get("total_tokens") is not None: 

517 total_tokens = run_info["total_tokens"] 

518 span.set_attribute(GEN_AI_USAGE_TOTAL_TOKENS, int(total_tokens)) 

519 

520 # Set other parameters from invocation_params 

521 self._set_invocation_parameters(span, run_info) 

522 

523 # Set metadata and tags if available 

524 extra = run_info.get("extra", {}) 

525 metadata = extra.get("metadata", {}) 

526 for key, value in metadata.items(): 

527 if value is not None: 

528 span.set_attribute(f"{LANGSMITH_METADATA}.{key}", value) 

529 

530 tags = run_info.get("tags") 

531 if tags: 

532 if isinstance(tags, list): 

533 span.set_attribute(LANGSMITH_TAGS, ", ".join(tags)) 

534 else: 

535 span.set_attribute(LANGSMITH_TAGS, tags) 

536 

537 # Support additional serialized attributes, if present 

538 if run_info.get("serialized") and isinstance(run_info["serialized"], dict): 

539 serialized = run_info["serialized"] 

540 if "name" in serialized and serialized["name"] is not None: 

541 span.set_attribute(GEN_AI_SERIALIZED_NAME, serialized["name"]) 

542 if "signature" in serialized and serialized["signature"] is not None: 

543 span.set_attribute(GEN_AI_SERIALIZED_SIGNATURE, serialized["signature"]) 

544 if "doc" in serialized and serialized["doc"] is not None: 

545 span.set_attribute(GEN_AI_SERIALIZED_DOC, serialized["doc"]) 

546 

547 # Set inputs/outputs if available 

548 self._set_io_attributes(span, op) 

549 

550 def _set_gen_ai_system(self, span: Span, run_info: dict) -> None: 

551 """Set the gen_ai.system attribute on the span based on the model provider. 

552 

553 Args: 

554 span: The span to set attributes on. 

555 run_info: The deserialized run info. 

556 """ 

557 # Default to "langchain" if we can't determine the system 

558 system = "langchain" 

559 

560 # Extract model name to determine the system 

561 model_name = self._extract_model_name(run_info) 

562 if model_name: 

563 model_lower = model_name.lower() 

564 if "anthropic" in model_lower or model_lower.startswith("claude"): 

565 system = "anthropic" 

566 elif "bedrock" in model_lower: 

567 system = "aws.bedrock" 

568 elif "azure" in model_lower and "openai" in model_lower: 

569 system = "az.ai.openai" 

570 elif "azure" in model_lower and "inference" in model_lower: 

571 system = "az.ai.inference" 

572 elif "cohere" in model_lower: 

573 system = "cohere" 

574 elif "deepseek" in model_lower: 

575 system = "deepseek" 

576 elif "gemini" in model_lower: 

577 system = "gemini" 

578 elif "groq" in model_lower: 

579 system = "groq" 

580 elif "watson" in model_lower or "ibm" in model_lower: 

581 system = "ibm.watsonx.ai" 

582 elif "mistral" in model_lower: 

583 system = "mistral_ai" 

584 elif "gpt" in model_lower or "openai" in model_lower: 

585 system = "openai" 

586 elif "perplexity" in model_lower or "sonar" in model_lower: 

587 system = "perplexity" 

588 elif "vertex" in model_lower: 

589 system = "vertex_ai" 

590 elif "xai" in model_lower or "grok" in model_lower: 

591 system = "xai" 

592 elif "qwen" in model_lower: 

593 system = "qwen" 

594 

595 span.set_attribute(GEN_AI_SYSTEM, system) 

596 setattr(span, "_gen_ai_system", system) 

597 

598 def _set_invocation_parameters(self, span: Span, run_info: dict) -> None: 

599 """Set invocation parameters on the span. 

600 

601 Args: 

602 span: The span to set attributes on. 

603 run_info: The deserialized run info. 

604 """ 

605 if not (run_info.get("extra") and run_info["extra"].get("metadata")): 

606 return 

607 

608 metadata = run_info["extra"]["metadata"] 

609 if "invocation_params" not in metadata: 

610 return 

611 

612 invocation_params = metadata["invocation_params"] 

613 

614 # Set relevant invocation parameters 

615 if "max_tokens" in invocation_params: 

616 span.set_attribute( 

617 GEN_AI_REQUEST_MAX_TOKENS, invocation_params["max_tokens"] 

618 ) 

619 

620 if "temperature" in invocation_params: 

621 span.set_attribute( 

622 GEN_AI_REQUEST_TEMPERATURE, invocation_params["temperature"] 

623 ) 

624 

625 if "top_p" in invocation_params: 

626 span.set_attribute(GEN_AI_REQUEST_TOP_P, invocation_params["top_p"]) 

627 

628 if "frequency_penalty" in invocation_params: 

629 span.set_attribute( 

630 GEN_AI_REQUEST_FREQUENCY_PENALTY, invocation_params["frequency_penalty"] 

631 ) 

632 

633 if "presence_penalty" in invocation_params: 

634 span.set_attribute( 

635 GEN_AI_REQUEST_PRESENCE_PENALTY, invocation_params["presence_penalty"] 

636 ) 

637 

638 def _set_io_attributes(self, span: Span, op: SerializedRunOperation) -> None: 

639 """Set input/output attributes on the span. 

640 

641 Args: 

642 span: The span to set attributes on. 

643 op: The serialized run operation. 

644 """ 

645 if op.inputs: 

646 try: 

647 inputs = _orjson.loads(op.inputs) 

648 

649 if isinstance(inputs, dict): 

650 if ( 

651 "model" in inputs 

652 and isinstance(inputs.get("messages"), list) 

653 and inputs["model"] is not None 

654 ): 

655 span.set_attribute(GEN_AI_REQUEST_MODEL, inputs["model"]) 

656 

657 # Set additional request attributes if available. 

658 if "stream" in inputs and inputs["stream"] is not None: 

659 span.set_attribute( 

660 LANGSMITH_REQUEST_STREAMING, inputs["stream"] 

661 ) 

662 if ( 

663 "extra_headers" in inputs 

664 and inputs["extra_headers"] is not None 

665 ): 

666 span.set_attribute( 

667 LANGSMITH_REQUEST_HEADERS, inputs["extra_headers"] 

668 ) 

669 if "extra_query" in inputs and inputs["extra_query"] is not None: 

670 span.set_attribute( 

671 GEN_AI_REQUEST_EXTRA_QUERY, inputs["extra_query"] 

672 ) 

673 if "extra_body" in inputs and inputs["extra_body"] is not None: 

674 span.set_attribute( 

675 GEN_AI_REQUEST_EXTRA_BODY, inputs["extra_body"] 

676 ) 

677 

678 span.set_attribute(GENAI_PROMPT, op.inputs) 

679 

680 except Exception: 

681 logger.debug( 

682 "Failed to process inputs for run %s", op.id, exc_info=True 

683 ) 

684 

685 if op.outputs: 

686 try: 

687 outputs = _orjson.loads(op.outputs) 

688 

689 # Extract token usage from outputs (for LLM runs) 

690 token_usage = self.get_unified_run_tokens(outputs) 

691 if token_usage: 

692 span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, token_usage[0]) 

693 span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, token_usage[1]) 

694 span.set_attribute( 

695 GEN_AI_USAGE_TOTAL_TOKENS, token_usage[0] + token_usage[1] 

696 ) 

697 

698 if "model" in outputs: 

699 span.set_attribute(GEN_AI_RESPONSE_MODEL, str(outputs["model"])) 

700 # Extract additional response attributes. 

701 if isinstance(outputs, dict): 

702 if "id" in outputs and outputs["id"] is not None: 

703 span.set_attribute(GEN_AI_RESPONSE_ID, outputs["id"]) 

704 if "choices" in outputs and isinstance(outputs["choices"], list): 

705 finish_reasons = [] 

706 for choice in outputs["choices"]: 

707 if ( 

708 "finish_reason" in choice 

709 and choice["finish_reason"] is not None 

710 ): 

711 finish_reasons.append(str(choice["finish_reason"])) 

712 if finish_reasons: 

713 span.set_attribute( 

714 GEN_AI_RESPONSE_FINISH_REASONS, 

715 ", ".join(finish_reasons), 

716 ) 

717 if ( 

718 "service_tier" in outputs 

719 and outputs["service_tier"] is not None 

720 ): 

721 span.set_attribute( 

722 GEN_AI_RESPONSE_SERVICE_TIER, outputs["service_tier"] 

723 ) 

724 if ( 

725 "system_fingerprint" in outputs 

726 and outputs["system_fingerprint"] is not None 

727 ): 

728 span.set_attribute( 

729 GEN_AI_RESPONSE_SYSTEM_FINGERPRINT, 

730 outputs["system_fingerprint"], 

731 ) 

732 if "usage_metadata" in outputs and isinstance( 

733 outputs["usage_metadata"], dict 

734 ): 

735 usage_metadata = outputs["usage_metadata"] 

736 if ( 

737 "input_token_details" in usage_metadata 

738 and usage_metadata["input_token_details"] is not None 

739 ): 

740 input_token_details = str( 

741 usage_metadata["input_token_details"] 

742 ) 

743 span.set_attribute( 

744 GEN_AI_USAGE_INPUT_TOKEN_DETAILS, input_token_details 

745 ) 

746 if ( 

747 "output_token_details" in usage_metadata 

748 and usage_metadata["output_token_details"] is not None 

749 ): 

750 output_token_details = str( 

751 usage_metadata["output_token_details"] 

752 ) 

753 span.set_attribute( 

754 GEN_AI_USAGE_OUTPUT_TOKEN_DETAILS, output_token_details 

755 ) 

756 

757 span.set_attribute(GENAI_COMPLETION, op.outputs) 

758 

759 except Exception: 

760 logger.debug( 

761 "Failed to process outputs for run %s", op.id, exc_info=True 

762 ) 

763 

764 def _as_utc_nano(self, timestamp: Optional[str]) -> Optional[int]: 

765 if not timestamp: 

766 return None 

767 try: 

768 dt = datetime.datetime.fromisoformat(timestamp) 

769 return int(dt.astimezone(datetime.timezone.utc).timestamp() * 1_000_000_000) 

770 except ValueError: 

771 logger.exception(f"Failed to parse timestamp {timestamp}") 

772 return None 

773 

774 def get_unified_run_tokens( 

775 self, outputs: Optional[dict] 

776 ) -> Optional[tuple[int, int]]: 

777 if not outputs: 

778 return None 

779 

780 # search in non-generations lists 

781 if output := self._extract_unified_run_tokens(outputs.get("usage_metadata")): 

782 return output 

783 

784 # find if direct kwarg in outputs 

785 keys = outputs.keys() 

786 for key in keys: 

787 haystack = outputs[key] 

788 if not haystack or not isinstance(haystack, dict): 

789 continue 

790 

791 if output := self._extract_unified_run_tokens( 

792 haystack.get("usage_metadata") 

793 ): 

794 return output 

795 

796 if ( 

797 haystack.get("lc") == 1 

798 and "kwargs" in haystack 

799 and isinstance(haystack["kwargs"], dict) 

800 and ( 

801 output := self._extract_unified_run_tokens( 

802 haystack["kwargs"].get("usage_metadata") 

803 ) 

804 ) 

805 ): 

806 return output 

807 

808 # find in generations 

809 generations = outputs.get("generations") or [] 

810 if not isinstance(generations, list): 

811 return None 

812 if generations and not isinstance(generations[0], list): 

813 generations = [generations] 

814 

815 for generation in [x for xs in generations for x in xs]: 

816 if ( 

817 isinstance(generation, dict) 

818 and "message" in generation 

819 and isinstance(generation["message"], dict) 

820 and "kwargs" in generation["message"] 

821 and isinstance(generation["message"]["kwargs"], dict) 

822 and ( 

823 output := self._extract_unified_run_tokens( 

824 generation["message"]["kwargs"].get("usage_metadata") 

825 ) 

826 ) 

827 ): 

828 return output 

829 return None 

830 

831 def _extract_unified_run_tokens( 

832 self, outputs: Optional[Any] 

833 ) -> Optional[tuple[int, int]]: 

834 if not outputs or not isinstance(outputs, dict): 

835 return None 

836 

837 if "input_tokens" not in outputs or "output_tokens" not in outputs: 

838 return None 

839 

840 if not isinstance(outputs["input_tokens"], int) or not isinstance( 

841 outputs["output_tokens"], int 

842 ): 

843 return None 

844 

845 return outputs["input_tokens"], outputs["output_tokens"]