Coverage for langsmith/run_trees.py: 16%

517 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""Schemas for the LangSmith API.""" 

2 

3from __future__ import annotations 

4 

5import functools 

6import json 

7import logging 

8import sys 

9from collections.abc import Mapping, Sequence 

10from datetime import datetime, timezone 

11from typing import Any, Optional, Union, cast 

12from uuid import NAMESPACE_DNS, UUID, uuid5 

13 

14from typing_extensions import TypedDict 

15 

16from langsmith._internal._uuid import uuid7 

17from langsmith.uuid import uuid7_from_datetime 

18 

19try: 

20 from pydantic.v1 import Field, root_validator # type: ignore[import] 

21except ImportError: 

22 from pydantic import ( # type: ignore[assignment, no-redef] 

23 Field, 

24 root_validator, 

25 ) 

26 

27import contextvars 

28import threading 

29import urllib.parse 

30 

31import langsmith._internal._context as _context 

32from langsmith import schemas as ls_schemas 

33from langsmith import utils 

34from langsmith.client import ID_TYPE, RUN_TYPE_T, Client, _dumps_json, _ensure_uuid 

35 

36logger = logging.getLogger(__name__) 

37 

38 

39class WriteReplica(TypedDict, total=False): 

40 api_url: Optional[str] 

41 api_key: Optional[str] 

42 project_name: Optional[str] 

43 updates: Optional[dict] 

44 

45 

46LANGSMITH_PREFIX = "langsmith-" 

47LANGSMITH_DOTTED_ORDER = sys.intern(f"{LANGSMITH_PREFIX}trace") 

48LANGSMITH_DOTTED_ORDER_BYTES = LANGSMITH_DOTTED_ORDER.encode("utf-8") 

49LANGSMITH_METADATA = sys.intern(f"{LANGSMITH_PREFIX}metadata") 

50LANGSMITH_TAGS = sys.intern(f"{LANGSMITH_PREFIX}tags") 

51LANGSMITH_PROJECT = sys.intern(f"{LANGSMITH_PREFIX}project") 

52LANGSMITH_REPLICAS = sys.intern(f"{LANGSMITH_PREFIX}replicas") 

53OVERRIDE_OUTPUTS = sys.intern("__omit_auto_outputs") 

54NOT_PROVIDED = cast(None, object()) 

55_LOCK = threading.Lock() 

56 

57# Context variables 

58_REPLICAS = contextvars.ContextVar[Optional[Sequence[WriteReplica]]]( 

59 "_REPLICAS", default=None 

60) 

61 

62_DISTRIBUTED_PARENT_ID = contextvars.ContextVar[Optional[str]]( 

63 "_DISTRIBUTED_PARENT_ID", default=None 

64) 

65 

66_SENTINEL = cast(None, object()) 

67 

68TIMESTAMP_LENGTH = 36 

69 

70 

71# Note, this is called directly by langchain. Do not remove. 

72def get_cached_client(**init_kwargs: Any) -> Client: 

73 global _CLIENT 

74 if _CLIENT is None: 

75 with _LOCK: 

76 if _CLIENT is None: 

77 _CLIENT = Client(**init_kwargs) 

78 return _CLIENT 

79 

80 

81def configure( 

82 client: Optional[Client] = _SENTINEL, 

83 enabled: Optional[bool] = _SENTINEL, 

84 project_name: Optional[str] = _SENTINEL, 

85 tags: Optional[list[str]] = _SENTINEL, 

86 metadata: Optional[dict[str, Any]] = _SENTINEL, 

87): 

88 """Configure global LangSmith tracing context. 

89 

90 This function allows you to set global configuration options for LangSmith 

91 tracing that will be applied to all subsequent traced operations. It modifies 

92 context variables that control tracing behavior across your application. 

93 

94 Do this once at startup to configure the global settings in code. 

95 

96 If, instead, you wish to only configure tracing for a single invocation, 

97 use the `tracing_context` context manager instead. 

98 

99 Args: 

100 client: A LangSmith Client instance to use for all tracing operations. 

101 

102 If provided, this client will be used instead of creating new clients. 

103 

104 Pass `None` to explicitly clear the global client. 

105 enabled: Whether tracing is enabled. 

106 

107 Can be: 

108 

109 - `True`: Enable tracing and send data to LangSmith 

110 - `False`: Disable tracing completely 

111 - `'local'`: Enable tracing but only store data locally 

112 - `None`: Clear the setting (falls back to environment variables) 

113 project_name: The LangSmith project name where traces will be sent. 

114 

115 This determines which project dashboard will display your traces. 

116 

117 Pass `None` to explicitly clear the project name. 

118 tags: A list of tags to be applied to all traced runs. 

119 

120 Tags are useful for filtering and organizing runs in the LangSmith UI. 

121 

122 Pass `None` to explicitly clear all global tags. 

123 metadata: A dictionary of metadata to attach to all traced runs. 

124 

125 Metadata can store any additional context about your runs. 

126 

127 Pass `None` to explicitly clear all global metadata. 

128 

129 Examples: 

130 Basic configuration: 

131 >>> import langsmith as ls 

132 >>> # Enable tracing with a specific project 

133 >>> ls.configure(enabled=True, project_name="my-project") 

134 

135 Set global trace masking: 

136 >>> def hide_keys(data): 

137 ... if not data: 

138 ... return {} 

139 ... return {k: v for k, v in data.items() if k not in ["key1", "key2"]} 

140 >>> ls.configure( 

141 ... client=ls.Client( 

142 ... hide_inputs=hide_keys, 

143 ... hide_outputs=hide_keys, 

144 ... ) 

145 ... ) 

146 

147 Adding global tags and metadata: 

148 >>> ls.configure( 

149 ... tags=["production", "v1.0"], 

150 ... metadata={"environment": "prod", "version": "1.0.0"}, 

151 ... ) 

152 

153 Disabling tracing: 

154 >>> ls.configure(enabled=False) 

155 """ 

156 global _CLIENT 

157 with _LOCK: 

158 if client is not _SENTINEL: 

159 _CLIENT = client 

160 if enabled is not _SENTINEL: 

161 _context._TRACING_ENABLED.set(enabled) 

162 _context._GLOBAL_TRACING_ENABLED = enabled 

163 if project_name is not _SENTINEL: 

164 _context._PROJECT_NAME.set(project_name) 

165 _context._GLOBAL_PROJECT_NAME = project_name 

166 if tags is not _SENTINEL: 

167 _context._TAGS.set(tags) 

168 _context._GLOBAL_TAGS = tags 

169 if metadata is not _SENTINEL: 

170 _context._METADATA.set(metadata) 

171 _context._GLOBAL_METADATA = metadata 

172 

173 

174def validate_extracted_usage_metadata( 

175 data: ls_schemas.ExtractedUsageMetadata, 

176) -> ls_schemas.ExtractedUsageMetadata: 

177 """Validate that the dict only contains allowed keys.""" 

178 allowed_keys = { 

179 "input_tokens", 

180 "output_tokens", 

181 "total_tokens", 

182 "input_token_details", 

183 "output_token_details", 

184 "input_cost", 

185 "output_cost", 

186 "total_cost", 

187 "input_cost_details", 

188 "output_cost_details", 

189 } 

190 

191 extra_keys = set(data.keys()) - allowed_keys 

192 if extra_keys: 

193 raise ValueError(f"Unexpected keys in usage metadata: {extra_keys}") 

194 return data # type: ignore 

195 

196 

197class RunTree(ls_schemas.RunBase): 

198 """Run Schema with back-references for posting runs.""" 

199 

200 name: str 

201 id: UUID = Field(default_factory=uuid7) 

202 run_type: str = Field(default="chain") 

203 start_time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) 

204 # Note: no longer set. 

205 parent_run: Optional[RunTree] = Field(default=None, exclude=True) 

206 parent_dotted_order: Optional[str] = Field(default=None, exclude=True) 

207 child_runs: list[RunTree] = Field( 

208 default_factory=list, 

209 exclude={"__all__": {"parent_run_id"}}, 

210 ) 

211 session_name: str = Field( 

212 default_factory=lambda: utils.get_tracer_project() or "default", 

213 alias="project_name", 

214 ) 

215 session_id: Optional[UUID] = Field(default=None, alias="project_id") 

216 extra: dict = Field(default_factory=dict) 

217 tags: Optional[list[str]] = Field(default_factory=list) 

218 events: list[dict] = Field(default_factory=list) 

219 """List of events associated with the run, like 

220 start and end events.""" 

221 ls_client: Optional[Any] = Field(default=None, exclude=True) 

222 dotted_order: str = Field( 

223 default="", description="The order of the run in the tree." 

224 ) 

225 trace_id: UUID = Field(default="", description="The trace id of the run.") # type: ignore 

226 dangerously_allow_filesystem: Optional[bool] = Field( 

227 default=False, description="Whether to allow filesystem access for attachments." 

228 ) 

229 replicas: Optional[Sequence[WriteReplica]] = Field( 

230 default=None, 

231 description="Projects to replicate this run to with optional updates.", 

232 ) 

233 

234 class Config: 

235 """Pydantic model configuration.""" 

236 

237 arbitrary_types_allowed = True 

238 allow_population_by_field_name = True 

239 extra = "ignore" 

240 

241 @root_validator(pre=True) 

242 def infer_defaults(cls, values: dict) -> dict: 

243 """Assign name to the run.""" 

244 if values.get("name") is None and values.get("serialized") is not None: 

245 if "name" in values["serialized"]: 

246 values["name"] = values["serialized"]["name"] 

247 elif "id" in values["serialized"]: 

248 values["name"] = values["serialized"]["id"][-1] 

249 if values.get("name") is None: 

250 values["name"] = "Unnamed" 

251 if "client" in values: # Handle user-constructed clients 

252 values["ls_client"] = values.pop("client") 

253 elif "_client" in values: 

254 values["ls_client"] = values.pop("_client") 

255 if not values.get("ls_client"): 

256 values["ls_client"] = None 

257 parent_run = values.pop("parent_run", None) 

258 if parent_run is not None: 

259 values["parent_run_id"] = parent_run.id 

260 values["parent_dotted_order"] = parent_run.dotted_order 

261 if "id" not in values: 

262 # Generate UUID from start_time if available 

263 if "start_time" in values and values["start_time"] is not None: 

264 values["id"] = uuid7_from_datetime(values["start_time"]) 

265 else: 

266 values["id"] = uuid7() 

267 if "trace_id" not in values: 

268 if parent_run is not None: 

269 values["trace_id"] = parent_run.trace_id 

270 else: 

271 values["trace_id"] = values["id"] 

272 cast(dict, values.setdefault("extra", {})) 

273 if values.get("events") is None: 

274 values["events"] = [] 

275 if values.get("tags") is None: 

276 values["tags"] = [] 

277 if values.get("outputs") is None: 

278 values["outputs"] = {} 

279 if values.get("attachments") is None: 

280 values["attachments"] = {} 

281 if values.get("replicas") is None: 

282 values["replicas"] = _REPLICAS.get() 

283 values["replicas"] = _ensure_write_replicas(values["replicas"]) 

284 return values 

285 

286 @root_validator(pre=False) 

287 def ensure_dotted_order(cls, values: dict) -> dict: 

288 """Ensure the dotted order of the run.""" 

289 current_dotted_order = values.get("dotted_order") 

290 if current_dotted_order and current_dotted_order.strip(): 

291 return values 

292 current_dotted_order = _create_current_dotted_order( 

293 values["start_time"], values["id"] 

294 ) 

295 parent_dotted_order = values.get("parent_dotted_order") 

296 if parent_dotted_order is not None: 

297 values["dotted_order"] = parent_dotted_order + "." + current_dotted_order 

298 else: 

299 values["dotted_order"] = current_dotted_order 

300 return values 

301 

302 @property 

303 def client(self) -> Client: 

304 """Return the client.""" 

305 # Lazily load the client 

306 # If you never use this for API calls, it will never be loaded 

307 if self.ls_client is None: 

308 self.ls_client = get_cached_client() 

309 return self.ls_client 

310 

311 @property 

312 def _client(self) -> Optional[Client]: 

313 # For backwards compat 

314 return self.ls_client 

315 

316 def __setattr__(self, name, value): 

317 """Set the `_client` specially.""" 

318 # For backwards compat 

319 if name == "_client": 

320 self.ls_client = value 

321 else: 

322 return super().__setattr__(name, value) 

323 

324 def set( 

325 self, 

326 *, 

327 inputs: Optional[Mapping[str, Any]] = NOT_PROVIDED, 

328 outputs: Optional[Mapping[str, Any]] = NOT_PROVIDED, 

329 tags: Optional[Sequence[str]] = NOT_PROVIDED, 

330 metadata: Optional[Mapping[str, Any]] = NOT_PROVIDED, 

331 usage_metadata: Optional[ls_schemas.ExtractedUsageMetadata] = NOT_PROVIDED, 

332 ) -> None: 

333 """Set the inputs, outputs, tags, and metadata of the run. 

334 

335 If performed, this will override the default behavior of the 

336 end() method to ignore new outputs (that would otherwise be added) 

337 by the @traceable decorator. 

338 

339 If your LangChain or LangGraph versions are sufficiently up-to-date, 

340 this will also override the default behavior of `LangChainTracer`. 

341 

342 Args: 

343 inputs: The inputs to set. 

344 outputs: The outputs to set. 

345 tags: The tags to set. 

346 metadata: The metadata to set. 

347 usage_metadata: Usage information to set. 

348 

349 Returns: 

350 None 

351 """ 

352 if tags is not NOT_PROVIDED: 

353 self.tags = list(tags) 

354 if metadata is not NOT_PROVIDED: 

355 self.extra.setdefault("metadata", {}).update(metadata or {}) 

356 if inputs is not NOT_PROVIDED: 

357 # Used by LangChain core to determine whether to 

358 # re-upload the inputs upon run completion 

359 self.extra["inputs_is_truthy"] = False 

360 if inputs is None: 

361 self.inputs = {} 

362 else: 

363 self.inputs = dict(inputs) 

364 if outputs is not NOT_PROVIDED: 

365 self.extra[OVERRIDE_OUTPUTS] = True 

366 if outputs is None: 

367 self.outputs = {} 

368 else: 

369 self.outputs = dict(outputs) 

370 if usage_metadata is not NOT_PROVIDED: 

371 self.extra.setdefault("metadata", {})["usage_metadata"] = ( 

372 validate_extracted_usage_metadata(usage_metadata) 

373 ) 

374 

375 def add_tags(self, tags: Union[Sequence[str], str]) -> None: 

376 """Add tags to the run.""" 

377 if isinstance(tags, str): 

378 tags = [tags] 

379 if self.tags is None: 

380 self.tags = [] 

381 self.tags.extend(tags) 

382 

383 def add_metadata(self, metadata: dict[str, Any]) -> None: 

384 """Add metadata to the run.""" 

385 if self.extra is None: 

386 self.extra = {} 

387 metadata_: dict = cast(dict, self.extra).setdefault("metadata", {}) 

388 metadata_.update(metadata) 

389 

390 def add_outputs(self, outputs: dict[str, Any]) -> None: 

391 """Upsert the given outputs into the run. 

392 

393 Args: 

394 outputs: A dictionary containing the outputs to be added. 

395 """ 

396 if self.outputs is None: 

397 self.outputs = {} 

398 self.outputs.update(outputs) 

399 

400 def add_inputs(self, inputs: dict[str, Any]) -> None: 

401 """Upsert the given inputs into the run. 

402 

403 Args: 

404 inputs: A dictionary containing the inputs to be added. 

405 """ 

406 if self.inputs is None: 

407 self.inputs = {} 

408 self.inputs.update(inputs) 

409 # Set to False so LangChain things it needs to 

410 # re-upload inputs 

411 self.extra["inputs_is_truthy"] = False 

412 

413 def add_event( 

414 self, 

415 events: Union[ 

416 ls_schemas.RunEvent, 

417 Sequence[ls_schemas.RunEvent], 

418 Sequence[dict], 

419 dict, 

420 str, 

421 ], 

422 ) -> None: 

423 """Add an event to the list of events. 

424 

425 Args: 

426 events: The event(s) to be added. It can be a single event, a sequence 

427 of events, a sequence of dictionaries, a dictionary, or a string. 

428 

429 Returns: 

430 None 

431 """ 

432 if self.events is None: 

433 self.events = [] 

434 if isinstance(events, dict): 

435 self.events.append(events) # type: ignore[arg-type] 

436 elif isinstance(events, str): 

437 self.events.append( 

438 { 

439 "name": "event", 

440 "time": datetime.now(timezone.utc).isoformat(), 

441 "message": events, 

442 } 

443 ) 

444 else: 

445 self.events.extend(events) # type: ignore[arg-type] 

446 

447 def end( 

448 self, 

449 *, 

450 outputs: Optional[dict] = None, 

451 error: Optional[str] = None, 

452 end_time: Optional[datetime] = None, 

453 events: Optional[Sequence[ls_schemas.RunEvent]] = None, 

454 metadata: Optional[dict[str, Any]] = None, 

455 ) -> None: 

456 """Set the end time of the run and all child runs.""" 

457 self.end_time = end_time or datetime.now(timezone.utc) 

458 # We've already 'set' the outputs, so ignore 

459 # the ones that are automatically included 

460 if not self.extra.get(OVERRIDE_OUTPUTS): 

461 if outputs is not None: 

462 if not self.outputs: 

463 self.outputs = outputs 

464 else: 

465 self.outputs.update(outputs) 

466 if error is not None: 

467 self.error = error 

468 if events is not None: 

469 self.add_event(events) 

470 if metadata is not None: 

471 self.add_metadata(metadata) 

472 

473 def create_child( 

474 self, 

475 name: str, 

476 run_type: RUN_TYPE_T = "chain", 

477 *, 

478 run_id: Optional[ID_TYPE] = None, 

479 serialized: Optional[dict] = None, 

480 inputs: Optional[dict] = None, 

481 outputs: Optional[dict] = None, 

482 error: Optional[str] = None, 

483 reference_example_id: Optional[UUID] = None, 

484 start_time: Optional[datetime] = None, 

485 end_time: Optional[datetime] = None, 

486 tags: Optional[list[str]] = None, 

487 extra: Optional[dict] = None, 

488 attachments: Optional[ls_schemas.Attachments] = None, 

489 ) -> RunTree: 

490 """Add a child run to the run tree.""" 

491 serialized_ = serialized or {"name": name} 

492 run = RunTree( 

493 name=name, 

494 id=_ensure_uuid(run_id), 

495 serialized=serialized_, 

496 inputs=inputs or {}, 

497 outputs=outputs or {}, 

498 error=error, 

499 run_type=run_type, 

500 reference_example_id=reference_example_id, 

501 start_time=start_time or datetime.now(timezone.utc), 

502 end_time=end_time, 

503 extra=extra or {}, 

504 parent_run=self, 

505 project_name=self.session_name, 

506 replicas=self.replicas, 

507 ls_client=self.ls_client, 

508 tags=tags, 

509 attachments=attachments or {}, # type: ignore 

510 dangerously_allow_filesystem=self.dangerously_allow_filesystem, 

511 ) 

512 

513 return run 

514 

515 def _get_dicts_safe(self): 

516 # Things like generators cannot be copied 

517 self_dict = self.dict( 

518 exclude={"child_runs", "inputs", "outputs"}, exclude_none=True 

519 ) 

520 if self.inputs is not None: 

521 # shallow copy. deep copying will occur in the client 

522 inputs_ = {} 

523 attachments = self_dict.get("attachments", {}) 

524 for k, v in self.inputs.items(): 

525 if isinstance(v, ls_schemas.Attachment): 

526 attachments[k] = v 

527 else: 

528 inputs_[k] = v 

529 self_dict["inputs"] = inputs_ 

530 if attachments: 

531 self_dict["attachments"] = attachments 

532 if self.outputs is not None: 

533 # shallow copy; deep copying will occur in the client 

534 self_dict["outputs"] = self.outputs.copy() 

535 return self_dict 

536 

537 def _slice_parent_id(self, parent_id: str, run_dict: dict) -> None: 

538 """Slice the parent id from dotted order. 

539 

540 Additionally check if the current run is a child of the parent. If so, update 

541 the parent_run_id to None, and set the trace id to the new root id after 

542 parent_id. 

543 """ 

544 if dotted_order := run_dict.get("dotted_order"): 

545 segs = dotted_order.split(".") 

546 start_idx = None 

547 parent_id = str(parent_id) 

548 # TODO(angus): potentially use binary search to find the index 

549 for idx, part in enumerate(segs): 

550 seg_id = part[-TIMESTAMP_LENGTH:] 

551 if str(seg_id) == parent_id: 

552 start_idx = idx 

553 break 

554 if start_idx is not None: 

555 # Trim segments to start after parent_id (exclusive) 

556 trimmed_segs = segs[start_idx + 1 :] 

557 # Rebuild dotted_order 

558 run_dict["dotted_order"] = ".".join(trimmed_segs) 

559 if trimmed_segs: 

560 run_dict["trace_id"] = UUID(trimmed_segs[0][-TIMESTAMP_LENGTH:]) 

561 else: 

562 run_dict["trace_id"] = run_dict["id"] 

563 if str(run_dict.get("parent_run_id")) == parent_id: 

564 # We've found the new root node. 

565 run_dict.pop("parent_run_id", None) 

566 

567 def _remap_for_project( 

568 self, project_name: str, updates: Optional[dict] = None 

569 ) -> dict: 

570 """Rewrites ids/dotted_order for a given project with optional updates.""" 

571 run_dict = self._get_dicts_safe() 

572 if project_name == self.session_name: 

573 return run_dict 

574 

575 if updates and updates.get("reroot", False): 

576 distributed_parent_id = _DISTRIBUTED_PARENT_ID.get() 

577 if distributed_parent_id: 

578 self._slice_parent_id(distributed_parent_id, run_dict) 

579 

580 old_id = run_dict["id"] 

581 new_id = uuid5(NAMESPACE_DNS, f"{old_id}:{project_name}") 

582 # trace id 

583 old_trace = run_dict.get("trace_id") 

584 if old_trace: 

585 new_trace = uuid5(NAMESPACE_DNS, f"{old_trace}:{project_name}") 

586 else: 

587 new_trace = None 

588 # parent id 

589 parent = run_dict.get("parent_run_id") 

590 if parent: 

591 new_parent = uuid5(NAMESPACE_DNS, f"{parent}:{project_name}") 

592 else: 

593 new_parent = None 

594 # dotted order 

595 if run_dict.get("dotted_order"): 

596 segs = run_dict["dotted_order"].split(".") 

597 rebuilt = [] 

598 for part in segs[:-1]: 

599 repl = uuid5( 

600 NAMESPACE_DNS, f"{part[-TIMESTAMP_LENGTH:]}:{project_name}" 

601 ) 

602 rebuilt.append(part[:-TIMESTAMP_LENGTH] + str(repl)) 

603 rebuilt.append(segs[-1][:-TIMESTAMP_LENGTH] + str(new_id)) 

604 dotted = ".".join(rebuilt) 

605 else: 

606 dotted = None 

607 dup = utils.deepish_copy(run_dict) 

608 dup.update( 

609 { 

610 "id": new_id, 

611 "trace_id": new_trace, 

612 "parent_run_id": new_parent, 

613 "dotted_order": dotted, 

614 "session_name": project_name, 

615 } 

616 ) 

617 if updates: 

618 dup.update(updates) 

619 return dup 

620 

621 def post(self, exclude_child_runs: bool = True) -> None: 

622 """Post the run tree to the API asynchronously.""" 

623 if self.replicas: 

624 for replica in self.replicas: 

625 project_name = replica.get("project_name") or self.session_name 

626 updates = replica.get("updates") 

627 run_dict = self._remap_for_project(project_name, updates) 

628 self.client.create_run( 

629 **run_dict, 

630 api_key=replica.get("api_key"), 

631 api_url=replica.get("api_url"), 

632 ) 

633 else: 

634 kwargs = self._get_dicts_safe() 

635 self.client.create_run(**kwargs) 

636 if self.attachments: 

637 keys = [str(name) for name in self.attachments] 

638 self.events.append( 

639 { 

640 "name": "uploaded_attachment", 

641 "time": datetime.now(timezone.utc).isoformat(), 

642 "message": set(keys), 

643 } 

644 ) 

645 if not exclude_child_runs: 

646 for child_run in self.child_runs: 

647 child_run.post(exclude_child_runs=False) 

648 

649 def patch(self, *, exclude_inputs: bool = False) -> None: 

650 """Patch the run tree to the API in a background thread. 

651 

652 Args: 

653 exclude_inputs: Whether to exclude inputs from the patch request. 

654 """ 

655 if not self.end_time: 

656 self.end() 

657 attachments = { 

658 a: v for a, v in self.attachments.items() if isinstance(v, tuple) 

659 } 

660 try: 

661 # Avoid loading the same attachment twice 

662 if attachments: 

663 uploaded = next( 

664 ( 

665 ev 

666 for ev in self.events 

667 if ev.get("name") == "uploaded_attachment" 

668 ), 

669 None, 

670 ) 

671 if uploaded: 

672 attachments = { 

673 a: v 

674 for a, v in attachments.items() 

675 if a not in uploaded["message"] 

676 } 

677 except Exception as e: 

678 logger.warning(f"Error filtering attachments to upload: {e}") 

679 if self.replicas: 

680 for replica in self.replicas: 

681 project_name = replica.get("project_name") or self.session_name 

682 updates = replica.get("updates") 

683 run_dict = self._remap_for_project(project_name, updates) 

684 self.client.update_run( 

685 name=run_dict["name"], 

686 run_id=run_dict["id"], 

687 run_type=run_dict.get("run_type"), 

688 start_time=run_dict.get("start_time"), 

689 inputs=None if exclude_inputs else run_dict["inputs"], 

690 outputs=run_dict["outputs"], 

691 error=run_dict.get("error"), 

692 parent_run_id=run_dict.get("parent_run_id"), 

693 session_name=run_dict.get("session_name"), 

694 reference_example_id=run_dict.get("reference_example_id"), 

695 end_time=run_dict.get("end_time"), 

696 dotted_order=run_dict.get("dotted_order"), 

697 trace_id=run_dict.get("trace_id"), 

698 events=run_dict.get("events"), 

699 tags=run_dict.get("tags"), 

700 extra=run_dict.get("extra"), 

701 attachments=attachments, 

702 api_key=replica.get("api_key"), 

703 api_url=replica.get("api_url"), 

704 ) 

705 else: 

706 self.client.update_run( 

707 name=self.name, 

708 run_id=self.id, 

709 run_type=cast(RUN_TYPE_T, self.run_type), 

710 start_time=self.start_time, 

711 inputs=( 

712 None 

713 if exclude_inputs 

714 else (self.inputs.copy() if self.inputs else None) 

715 ), 

716 outputs=self.outputs.copy() if self.outputs else None, 

717 error=self.error, 

718 parent_run_id=self.parent_run_id, 

719 session_name=self.session_name, 

720 reference_example_id=self.reference_example_id, 

721 end_time=self.end_time, 

722 dotted_order=self.dotted_order, 

723 trace_id=self.trace_id, 

724 events=self.events, 

725 tags=self.tags, 

726 extra=self.extra, 

727 attachments=attachments, 

728 ) 

729 

730 def wait(self) -> None: 

731 """Wait for all `_futures` to complete.""" 

732 pass 

733 

734 def get_url(self) -> str: 

735 """Return the URL of the run.""" 

736 return self.client.get_run_url(run=self) 

737 

738 @classmethod 

739 def from_dotted_order( 

740 cls, 

741 dotted_order: str, 

742 **kwargs: Any, 

743 ) -> RunTree: 

744 """Create a new 'child' span from the provided dotted order. 

745 

746 Returns: 

747 RunTree: The new span. 

748 """ 

749 headers = { 

750 LANGSMITH_DOTTED_ORDER: dotted_order, 

751 } 

752 return cast(RunTree, cls.from_headers(headers, **kwargs)) # type: ignore[arg-type] 

753 

754 @classmethod 

755 def from_runnable_config( 

756 cls, 

757 config: Optional[dict], 

758 **kwargs: Any, 

759 ) -> Optional[RunTree]: 

760 """Create a new 'child' span from the provided runnable config. 

761 

762 Requires `langchain` to be installed. 

763 

764 Returns: 

765 The new span or `None` if no parent span information is found. 

766 """ 

767 try: 

768 from langchain_core.callbacks.manager import ( 

769 AsyncCallbackManager, 

770 CallbackManager, 

771 ) 

772 from langchain_core.runnables import RunnableConfig, ensure_config 

773 from langchain_core.tracers.langchain import LangChainTracer 

774 except ImportError as e: 

775 raise ImportError( 

776 "RunTree.from_runnable_config requires langchain-core to be installed. " 

777 "You can install it with `pip install langchain-core`." 

778 ) from e 

779 if config is None: 

780 config_ = ensure_config( 

781 cast(RunnableConfig, config) if isinstance(config, dict) else None 

782 ) 

783 else: 

784 config_ = cast(RunnableConfig, config) 

785 

786 if ( 

787 (cb := config_.get("callbacks")) 

788 and isinstance(cb, (CallbackManager, AsyncCallbackManager)) 

789 and cb.parent_run_id 

790 and ( 

791 tracer := next( 

792 (t for t in cb.handlers if isinstance(t, LangChainTracer)), 

793 None, 

794 ) 

795 ) 

796 ): 

797 if (run := tracer.run_map.get(str(cb.parent_run_id))) and run.dotted_order: 

798 dotted_order = run.dotted_order 

799 kwargs["run_type"] = run.run_type 

800 kwargs["inputs"] = run.inputs 

801 kwargs["outputs"] = run.outputs 

802 kwargs["start_time"] = run.start_time 

803 kwargs["end_time"] = run.end_time 

804 kwargs["tags"] = sorted(set(run.tags or [] + kwargs.get("tags", []))) 

805 kwargs["name"] = run.name 

806 extra_ = kwargs.setdefault("extra", {}) 

807 metadata_ = extra_.setdefault("metadata", {}) 

808 metadata_.update(run.metadata) 

809 elif hasattr(tracer, "order_map") and cb.parent_run_id in tracer.order_map: 

810 dotted_order = tracer.order_map[cb.parent_run_id][1] 

811 else: 

812 return None 

813 kwargs["client"] = tracer.client 

814 kwargs["project_name"] = tracer.project_name 

815 return RunTree.from_dotted_order(dotted_order, **kwargs) 

816 return None 

817 

818 @classmethod 

819 def from_headers( 

820 cls, headers: Mapping[Union[str, bytes], Union[str, bytes]], **kwargs: Any 

821 ) -> Optional[RunTree]: 

822 """Create a new 'parent' span from the provided headers. 

823 

824 Extracts parent span information from the headers and creates a new span. 

825 

826 Metadata and tags are extracted from the baggage header. 

827 

828 The dotted order and trace id are extracted from the trace header. 

829 

830 Returns: 

831 The new span or `None` if no parent span information is found. 

832 """ 

833 init_args = kwargs.copy() 

834 

835 langsmith_trace = cast(Optional[str], headers.get(LANGSMITH_DOTTED_ORDER)) 

836 if not langsmith_trace: 

837 langsmith_trace_bytes = cast( 

838 Optional[bytes], headers.get(LANGSMITH_DOTTED_ORDER_BYTES) 

839 ) 

840 if not langsmith_trace_bytes: 

841 return # type: ignore[return-value] 

842 langsmith_trace = langsmith_trace_bytes.decode("utf-8") 

843 

844 parent_dotted_order = langsmith_trace.strip() 

845 parsed_dotted_order = _parse_dotted_order(parent_dotted_order) 

846 trace_id = parsed_dotted_order[0][1] 

847 init_args["trace_id"] = trace_id 

848 init_args["id"] = parsed_dotted_order[-1][1] 

849 init_args["dotted_order"] = parent_dotted_order 

850 if len(parsed_dotted_order) >= 2: 

851 # Has a parent 

852 init_args["parent_run_id"] = parsed_dotted_order[-2][1] 

853 # All placeholders. We assume the source process 

854 # handles the life-cycle of the run. 

855 init_args["start_time"] = init_args.get("start_time") or datetime.now( 

856 timezone.utc 

857 ) 

858 init_args["run_type"] = init_args.get("run_type") or "chain" 

859 init_args["name"] = init_args.get("name") or "parent" 

860 

861 baggage = _Baggage.from_headers(headers) 

862 if baggage.metadata or baggage.tags: 

863 init_args["extra"] = init_args.setdefault("extra", {}) 

864 init_args["extra"]["metadata"] = init_args["extra"].setdefault( 

865 "metadata", {} 

866 ) 

867 metadata = {**baggage.metadata, **init_args["extra"]["metadata"]} 

868 init_args["extra"]["metadata"] = metadata 

869 tags = sorted(set(baggage.tags + init_args.get("tags", []))) 

870 init_args["tags"] = tags 

871 if baggage.project_name: 

872 init_args["project_name"] = baggage.project_name 

873 if baggage.replicas: 

874 init_args["replicas"] = baggage.replicas 

875 

876 run_tree = RunTree(**init_args) 

877 

878 # Set the distributed parent ID to this run's ID for rerooting 

879 _DISTRIBUTED_PARENT_ID.set(str(run_tree.id)) 

880 

881 return run_tree 

882 

883 def to_headers(self) -> dict[str, str]: 

884 """Return the `RunTree` as a dictionary of headers.""" 

885 headers = {} 

886 if self.trace_id: 

887 headers[f"{LANGSMITH_DOTTED_ORDER}"] = self.dotted_order 

888 baggage = _Baggage( 

889 metadata=self.extra.get("metadata", {}), 

890 tags=self.tags, 

891 project_name=self.session_name, 

892 replicas=self.replicas, 

893 ) 

894 headers["baggage"] = baggage.to_header() 

895 return headers 

896 

897 def __repr__(self): 

898 """Return a string representation of the `RunTree` object.""" 

899 return ( 

900 f"RunTree(id={self.id}, name='{self.name}', " 

901 f"run_type='{self.run_type}', dotted_order='{self.dotted_order}')" 

902 ) 

903 

904 

905class _Baggage: 

906 """Baggage header information.""" 

907 

908 def __init__( 

909 self, 

910 metadata: Optional[dict[str, str]] = None, 

911 tags: Optional[list[str]] = None, 

912 project_name: Optional[str] = None, 

913 replicas: Optional[Sequence[WriteReplica]] = None, 

914 ): 

915 """Initialize the Baggage object.""" 

916 self.metadata = metadata or {} 

917 self.tags = tags or [] 

918 self.project_name = project_name 

919 self.replicas = replicas or [] 

920 

921 @classmethod 

922 def from_header(cls, header_value: Optional[str]) -> _Baggage: 

923 """Create a Baggage object from the given header value.""" 

924 if not header_value: 

925 return cls() 

926 metadata = {} 

927 tags = [] 

928 project_name = None 

929 replicas: Optional[list[WriteReplica]] = None 

930 try: 

931 for item in header_value.split(","): 

932 key, value = item.split("=", 1) 

933 if key == LANGSMITH_METADATA: 

934 metadata = json.loads(urllib.parse.unquote(value)) 

935 elif key == LANGSMITH_TAGS: 

936 tags = urllib.parse.unquote(value).split(",") 

937 elif key == LANGSMITH_PROJECT: 

938 project_name = urllib.parse.unquote(value) 

939 elif key == LANGSMITH_REPLICAS: 

940 replicas_data = json.loads(urllib.parse.unquote(value)) 

941 parsed_replicas: list[WriteReplica] = [] 

942 for replica_item in replicas_data: 

943 if ( 

944 isinstance(replica_item, (tuple, list)) 

945 and len(replica_item) == 2 

946 ): 

947 # Convert legacy format to WriteReplica 

948 parsed_replicas.append( 

949 WriteReplica( 

950 api_url=None, 

951 api_key=None, 

952 project_name=str(replica_item[0]), 

953 updates=replica_item[1], 

954 ) 

955 ) 

956 elif isinstance(replica_item, dict): 

957 # New WriteReplica format: preserve as dict 

958 parsed_replicas.append(cast(WriteReplica, replica_item)) 

959 else: 

960 logger.warning( 

961 f"Unknown replica format in baggage: {replica_item}" 

962 ) 

963 continue 

964 replicas = parsed_replicas 

965 except Exception as e: 

966 logger.warning(f"Error parsing baggage header: {e}") 

967 

968 return cls( 

969 metadata=metadata, tags=tags, project_name=project_name, replicas=replicas 

970 ) 

971 

972 @classmethod 

973 def from_headers(cls, headers: Mapping[Union[str, bytes], Any]) -> _Baggage: 

974 if "baggage" in headers: 

975 return cls.from_header(headers["baggage"]) 

976 elif b"baggage" in headers: 

977 return cls.from_header(cast(bytes, headers[b"baggage"]).decode("utf-8")) 

978 else: 

979 return cls.from_header(None) 

980 

981 def to_header(self) -> str: 

982 """Return the Baggage object as a header value.""" 

983 items = [] 

984 if self.metadata: 

985 serialized_metadata = _dumps_json(self.metadata) 

986 items.append( 

987 f"{LANGSMITH_PREFIX}metadata={urllib.parse.quote(serialized_metadata)}" 

988 ) 

989 if self.tags: 

990 serialized_tags = ",".join(self.tags) 

991 items.append( 

992 f"{LANGSMITH_PREFIX}tags={urllib.parse.quote(serialized_tags)}" 

993 ) 

994 if self.project_name: 

995 items.append( 

996 f"{LANGSMITH_PREFIX}project={urllib.parse.quote(self.project_name)}" 

997 ) 

998 if self.replicas: 

999 serialized_replicas = _dumps_json(self.replicas) 

1000 items.append( 

1001 f"{LANGSMITH_PREFIX}replicas={urllib.parse.quote(serialized_replicas)}" 

1002 ) 

1003 return ",".join(items) 

1004 

1005 

1006@functools.lru_cache(maxsize=1) 

1007def _parse_write_replicas_from_env_var(env_var: Optional[str]) -> list[WriteReplica]: 

1008 """Parse write replicas from LANGSMITH_RUNS_ENDPOINTS environment variable value. 

1009 

1010 Supports array format [{"api_url": "x", "api_key": "y"}] and object format 

1011 {"url": "key"}. 

1012 """ 

1013 if not env_var: 

1014 return [] 

1015 

1016 try: 

1017 parsed = json.loads(env_var) 

1018 

1019 if isinstance(parsed, list): 

1020 replicas = [] 

1021 for item in parsed: 

1022 if not isinstance(item, dict): 

1023 logger.warning( 

1024 f"Invalid item type in LANGSMITH_RUNS_ENDPOINTS: " 

1025 f"expected dict, got {type(item).__name__}" 

1026 ) 

1027 continue 

1028 

1029 api_url = item.get("api_url") 

1030 api_key = item.get("api_key") 

1031 

1032 if not isinstance(api_url, str): 

1033 logger.warning( 

1034 f"Invalid api_url type in LANGSMITH_RUNS_ENDPOINTS: " 

1035 f"expected string, got {type(api_url).__name__}" 

1036 ) 

1037 continue 

1038 

1039 if not isinstance(api_key, str): 

1040 logger.warning( 

1041 f"Invalid api_key type in LANGSMITH_RUNS_ENDPOINTS: " 

1042 f"expected string, got {type(api_key).__name__}" 

1043 ) 

1044 continue 

1045 

1046 replicas.append( 

1047 WriteReplica( 

1048 api_url=api_url.rstrip("/"), 

1049 api_key=api_key, 

1050 project_name=None, 

1051 updates=None, 

1052 ) 

1053 ) 

1054 return replicas 

1055 elif isinstance(parsed, dict): 

1056 _check_endpoint_env_unset(parsed) 

1057 

1058 replicas = [] 

1059 for url, key in parsed.items(): 

1060 url = url.rstrip("/") 

1061 

1062 if isinstance(key, str): 

1063 replicas.append( 

1064 WriteReplica( 

1065 api_url=url, 

1066 api_key=key, 

1067 project_name=None, 

1068 updates=None, 

1069 ) 

1070 ) 

1071 else: 

1072 logger.warning( 

1073 f"Invalid value type in LANGSMITH_RUNS_ENDPOINTS for URL " 

1074 f"{url}: " 

1075 f"expected string, got {type(key).__name__}" 

1076 ) 

1077 continue 

1078 return replicas 

1079 else: 

1080 logger.warning( 

1081 f"Invalid LANGSMITH_RUNS_ENDPOINTS – must be valid JSON list of " 

1082 "objects with api_url and api_key properties, or object mapping " 

1083 f"url->apiKey, got {type(parsed).__name__}" 

1084 ) 

1085 return [] 

1086 except utils.LangSmithUserError: 

1087 raise 

1088 except Exception as e: 

1089 logger.warning( 

1090 "Invalid LANGSMITH_RUNS_ENDPOINTS – must be valid JSON list of " 

1091 f"objects with api_url and api_key properties, or object mapping" 

1092 f" url->apiKey: {e}" 

1093 ) 

1094 return [] 

1095 

1096 

1097def _get_write_replicas_from_env() -> list[WriteReplica]: 

1098 """Get write replicas from LANGSMITH_RUNS_ENDPOINTS environment variable.""" 

1099 env_var = utils.get_env_var("RUNS_ENDPOINTS") 

1100 

1101 return _parse_write_replicas_from_env_var(env_var) 

1102 

1103 

1104def _check_endpoint_env_unset(parsed: dict[str, str]) -> None: 

1105 """Check if endpoint environment variables conflict with runs endpoints.""" 

1106 import os 

1107 

1108 if parsed and (os.getenv("LANGSMITH_ENDPOINT") or os.getenv("LANGCHAIN_ENDPOINT")): 

1109 raise utils.LangSmithUserError( 

1110 "You cannot provide both LANGSMITH_ENDPOINT / LANGCHAIN_ENDPOINT " 

1111 "and LANGSMITH_RUNS_ENDPOINTS." 

1112 ) 

1113 

1114 

1115def _ensure_write_replicas( 

1116 replicas: Optional[Sequence[WriteReplica]], 

1117) -> list[WriteReplica]: 

1118 """Convert replicas to WriteReplica format.""" 

1119 if replicas is None: 

1120 return _get_write_replicas_from_env() 

1121 

1122 # All replicas should now be WriteReplica dicts 

1123 return list(replicas) 

1124 

1125 

1126def _parse_dotted_order(dotted_order: str) -> list[tuple[datetime, UUID]]: 

1127 """Parse the dotted order string.""" 

1128 parts = dotted_order.split(".") 

1129 return [ 

1130 ( 

1131 datetime.strptime(part[:-TIMESTAMP_LENGTH], "%Y%m%dT%H%M%S%fZ"), 

1132 UUID(part[-TIMESTAMP_LENGTH:]), 

1133 ) 

1134 for part in parts 

1135 ] 

1136 

1137 

1138_CLIENT: Optional[Client] = _context._GLOBAL_CLIENT 

1139__all__ = ["RunTree", "RunTree"] 

1140 

1141 

1142def _create_current_dotted_order( 

1143 start_time: Optional[datetime], run_id: Optional[UUID] 

1144) -> str: 

1145 """Create the current dotted order.""" 

1146 st = start_time or datetime.now(timezone.utc) 

1147 id_ = run_id or uuid7_from_datetime(st) 

1148 return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_)