Coverage for langsmith/run_trees.py: 16%
517 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""Schemas for the LangSmith API."""
3from __future__ import annotations
5import functools
6import json
7import logging
8import sys
9from collections.abc import Mapping, Sequence
10from datetime import datetime, timezone
11from typing import Any, Optional, Union, cast
12from uuid import NAMESPACE_DNS, UUID, uuid5
14from typing_extensions import TypedDict
16from langsmith._internal._uuid import uuid7
17from langsmith.uuid import uuid7_from_datetime
19try:
20 from pydantic.v1 import Field, root_validator # type: ignore[import]
21except ImportError:
22 from pydantic import ( # type: ignore[assignment, no-redef]
23 Field,
24 root_validator,
25 )
27import contextvars
28import threading
29import urllib.parse
31import langsmith._internal._context as _context
32from langsmith import schemas as ls_schemas
33from langsmith import utils
34from langsmith.client import ID_TYPE, RUN_TYPE_T, Client, _dumps_json, _ensure_uuid
36logger = logging.getLogger(__name__)
39class WriteReplica(TypedDict, total=False):
40 api_url: Optional[str]
41 api_key: Optional[str]
42 project_name: Optional[str]
43 updates: Optional[dict]
46LANGSMITH_PREFIX = "langsmith-"
47LANGSMITH_DOTTED_ORDER = sys.intern(f"{LANGSMITH_PREFIX}trace")
48LANGSMITH_DOTTED_ORDER_BYTES = LANGSMITH_DOTTED_ORDER.encode("utf-8")
49LANGSMITH_METADATA = sys.intern(f"{LANGSMITH_PREFIX}metadata")
50LANGSMITH_TAGS = sys.intern(f"{LANGSMITH_PREFIX}tags")
51LANGSMITH_PROJECT = sys.intern(f"{LANGSMITH_PREFIX}project")
52LANGSMITH_REPLICAS = sys.intern(f"{LANGSMITH_PREFIX}replicas")
53OVERRIDE_OUTPUTS = sys.intern("__omit_auto_outputs")
54NOT_PROVIDED = cast(None, object())
55_LOCK = threading.Lock()
57# Context variables
58_REPLICAS = contextvars.ContextVar[Optional[Sequence[WriteReplica]]](
59 "_REPLICAS", default=None
60)
62_DISTRIBUTED_PARENT_ID = contextvars.ContextVar[Optional[str]](
63 "_DISTRIBUTED_PARENT_ID", default=None
64)
66_SENTINEL = cast(None, object())
68TIMESTAMP_LENGTH = 36
71# Note, this is called directly by langchain. Do not remove.
72def get_cached_client(**init_kwargs: Any) -> Client:
73 global _CLIENT
74 if _CLIENT is None:
75 with _LOCK:
76 if _CLIENT is None:
77 _CLIENT = Client(**init_kwargs)
78 return _CLIENT
81def configure(
82 client: Optional[Client] = _SENTINEL,
83 enabled: Optional[bool] = _SENTINEL,
84 project_name: Optional[str] = _SENTINEL,
85 tags: Optional[list[str]] = _SENTINEL,
86 metadata: Optional[dict[str, Any]] = _SENTINEL,
87):
88 """Configure global LangSmith tracing context.
90 This function allows you to set global configuration options for LangSmith
91 tracing that will be applied to all subsequent traced operations. It modifies
92 context variables that control tracing behavior across your application.
94 Do this once at startup to configure the global settings in code.
96 If, instead, you wish to only configure tracing for a single invocation,
97 use the `tracing_context` context manager instead.
99 Args:
100 client: A LangSmith Client instance to use for all tracing operations.
102 If provided, this client will be used instead of creating new clients.
104 Pass `None` to explicitly clear the global client.
105 enabled: Whether tracing is enabled.
107 Can be:
109 - `True`: Enable tracing and send data to LangSmith
110 - `False`: Disable tracing completely
111 - `'local'`: Enable tracing but only store data locally
112 - `None`: Clear the setting (falls back to environment variables)
113 project_name: The LangSmith project name where traces will be sent.
115 This determines which project dashboard will display your traces.
117 Pass `None` to explicitly clear the project name.
118 tags: A list of tags to be applied to all traced runs.
120 Tags are useful for filtering and organizing runs in the LangSmith UI.
122 Pass `None` to explicitly clear all global tags.
123 metadata: A dictionary of metadata to attach to all traced runs.
125 Metadata can store any additional context about your runs.
127 Pass `None` to explicitly clear all global metadata.
129 Examples:
130 Basic configuration:
131 >>> import langsmith as ls
132 >>> # Enable tracing with a specific project
133 >>> ls.configure(enabled=True, project_name="my-project")
135 Set global trace masking:
136 >>> def hide_keys(data):
137 ... if not data:
138 ... return {}
139 ... return {k: v for k, v in data.items() if k not in ["key1", "key2"]}
140 >>> ls.configure(
141 ... client=ls.Client(
142 ... hide_inputs=hide_keys,
143 ... hide_outputs=hide_keys,
144 ... )
145 ... )
147 Adding global tags and metadata:
148 >>> ls.configure(
149 ... tags=["production", "v1.0"],
150 ... metadata={"environment": "prod", "version": "1.0.0"},
151 ... )
153 Disabling tracing:
154 >>> ls.configure(enabled=False)
155 """
156 global _CLIENT
157 with _LOCK:
158 if client is not _SENTINEL:
159 _CLIENT = client
160 if enabled is not _SENTINEL:
161 _context._TRACING_ENABLED.set(enabled)
162 _context._GLOBAL_TRACING_ENABLED = enabled
163 if project_name is not _SENTINEL:
164 _context._PROJECT_NAME.set(project_name)
165 _context._GLOBAL_PROJECT_NAME = project_name
166 if tags is not _SENTINEL:
167 _context._TAGS.set(tags)
168 _context._GLOBAL_TAGS = tags
169 if metadata is not _SENTINEL:
170 _context._METADATA.set(metadata)
171 _context._GLOBAL_METADATA = metadata
174def validate_extracted_usage_metadata(
175 data: ls_schemas.ExtractedUsageMetadata,
176) -> ls_schemas.ExtractedUsageMetadata:
177 """Validate that the dict only contains allowed keys."""
178 allowed_keys = {
179 "input_tokens",
180 "output_tokens",
181 "total_tokens",
182 "input_token_details",
183 "output_token_details",
184 "input_cost",
185 "output_cost",
186 "total_cost",
187 "input_cost_details",
188 "output_cost_details",
189 }
191 extra_keys = set(data.keys()) - allowed_keys
192 if extra_keys:
193 raise ValueError(f"Unexpected keys in usage metadata: {extra_keys}")
194 return data # type: ignore
197class RunTree(ls_schemas.RunBase):
198 """Run Schema with back-references for posting runs."""
200 name: str
201 id: UUID = Field(default_factory=uuid7)
202 run_type: str = Field(default="chain")
203 start_time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
204 # Note: no longer set.
205 parent_run: Optional[RunTree] = Field(default=None, exclude=True)
206 parent_dotted_order: Optional[str] = Field(default=None, exclude=True)
207 child_runs: list[RunTree] = Field(
208 default_factory=list,
209 exclude={"__all__": {"parent_run_id"}},
210 )
211 session_name: str = Field(
212 default_factory=lambda: utils.get_tracer_project() or "default",
213 alias="project_name",
214 )
215 session_id: Optional[UUID] = Field(default=None, alias="project_id")
216 extra: dict = Field(default_factory=dict)
217 tags: Optional[list[str]] = Field(default_factory=list)
218 events: list[dict] = Field(default_factory=list)
219 """List of events associated with the run, like
220 start and end events."""
221 ls_client: Optional[Any] = Field(default=None, exclude=True)
222 dotted_order: str = Field(
223 default="", description="The order of the run in the tree."
224 )
225 trace_id: UUID = Field(default="", description="The trace id of the run.") # type: ignore
226 dangerously_allow_filesystem: Optional[bool] = Field(
227 default=False, description="Whether to allow filesystem access for attachments."
228 )
229 replicas: Optional[Sequence[WriteReplica]] = Field(
230 default=None,
231 description="Projects to replicate this run to with optional updates.",
232 )
234 class Config:
235 """Pydantic model configuration."""
237 arbitrary_types_allowed = True
238 allow_population_by_field_name = True
239 extra = "ignore"
241 @root_validator(pre=True)
242 def infer_defaults(cls, values: dict) -> dict:
243 """Assign name to the run."""
244 if values.get("name") is None and values.get("serialized") is not None:
245 if "name" in values["serialized"]:
246 values["name"] = values["serialized"]["name"]
247 elif "id" in values["serialized"]:
248 values["name"] = values["serialized"]["id"][-1]
249 if values.get("name") is None:
250 values["name"] = "Unnamed"
251 if "client" in values: # Handle user-constructed clients
252 values["ls_client"] = values.pop("client")
253 elif "_client" in values:
254 values["ls_client"] = values.pop("_client")
255 if not values.get("ls_client"):
256 values["ls_client"] = None
257 parent_run = values.pop("parent_run", None)
258 if parent_run is not None:
259 values["parent_run_id"] = parent_run.id
260 values["parent_dotted_order"] = parent_run.dotted_order
261 if "id" not in values:
262 # Generate UUID from start_time if available
263 if "start_time" in values and values["start_time"] is not None:
264 values["id"] = uuid7_from_datetime(values["start_time"])
265 else:
266 values["id"] = uuid7()
267 if "trace_id" not in values:
268 if parent_run is not None:
269 values["trace_id"] = parent_run.trace_id
270 else:
271 values["trace_id"] = values["id"]
272 cast(dict, values.setdefault("extra", {}))
273 if values.get("events") is None:
274 values["events"] = []
275 if values.get("tags") is None:
276 values["tags"] = []
277 if values.get("outputs") is None:
278 values["outputs"] = {}
279 if values.get("attachments") is None:
280 values["attachments"] = {}
281 if values.get("replicas") is None:
282 values["replicas"] = _REPLICAS.get()
283 values["replicas"] = _ensure_write_replicas(values["replicas"])
284 return values
286 @root_validator(pre=False)
287 def ensure_dotted_order(cls, values: dict) -> dict:
288 """Ensure the dotted order of the run."""
289 current_dotted_order = values.get("dotted_order")
290 if current_dotted_order and current_dotted_order.strip():
291 return values
292 current_dotted_order = _create_current_dotted_order(
293 values["start_time"], values["id"]
294 )
295 parent_dotted_order = values.get("parent_dotted_order")
296 if parent_dotted_order is not None:
297 values["dotted_order"] = parent_dotted_order + "." + current_dotted_order
298 else:
299 values["dotted_order"] = current_dotted_order
300 return values
302 @property
303 def client(self) -> Client:
304 """Return the client."""
305 # Lazily load the client
306 # If you never use this for API calls, it will never be loaded
307 if self.ls_client is None:
308 self.ls_client = get_cached_client()
309 return self.ls_client
311 @property
312 def _client(self) -> Optional[Client]:
313 # For backwards compat
314 return self.ls_client
316 def __setattr__(self, name, value):
317 """Set the `_client` specially."""
318 # For backwards compat
319 if name == "_client":
320 self.ls_client = value
321 else:
322 return super().__setattr__(name, value)
324 def set(
325 self,
326 *,
327 inputs: Optional[Mapping[str, Any]] = NOT_PROVIDED,
328 outputs: Optional[Mapping[str, Any]] = NOT_PROVIDED,
329 tags: Optional[Sequence[str]] = NOT_PROVIDED,
330 metadata: Optional[Mapping[str, Any]] = NOT_PROVIDED,
331 usage_metadata: Optional[ls_schemas.ExtractedUsageMetadata] = NOT_PROVIDED,
332 ) -> None:
333 """Set the inputs, outputs, tags, and metadata of the run.
335 If performed, this will override the default behavior of the
336 end() method to ignore new outputs (that would otherwise be added)
337 by the @traceable decorator.
339 If your LangChain or LangGraph versions are sufficiently up-to-date,
340 this will also override the default behavior of `LangChainTracer`.
342 Args:
343 inputs: The inputs to set.
344 outputs: The outputs to set.
345 tags: The tags to set.
346 metadata: The metadata to set.
347 usage_metadata: Usage information to set.
349 Returns:
350 None
351 """
352 if tags is not NOT_PROVIDED:
353 self.tags = list(tags)
354 if metadata is not NOT_PROVIDED:
355 self.extra.setdefault("metadata", {}).update(metadata or {})
356 if inputs is not NOT_PROVIDED:
357 # Used by LangChain core to determine whether to
358 # re-upload the inputs upon run completion
359 self.extra["inputs_is_truthy"] = False
360 if inputs is None:
361 self.inputs = {}
362 else:
363 self.inputs = dict(inputs)
364 if outputs is not NOT_PROVIDED:
365 self.extra[OVERRIDE_OUTPUTS] = True
366 if outputs is None:
367 self.outputs = {}
368 else:
369 self.outputs = dict(outputs)
370 if usage_metadata is not NOT_PROVIDED:
371 self.extra.setdefault("metadata", {})["usage_metadata"] = (
372 validate_extracted_usage_metadata(usage_metadata)
373 )
375 def add_tags(self, tags: Union[Sequence[str], str]) -> None:
376 """Add tags to the run."""
377 if isinstance(tags, str):
378 tags = [tags]
379 if self.tags is None:
380 self.tags = []
381 self.tags.extend(tags)
383 def add_metadata(self, metadata: dict[str, Any]) -> None:
384 """Add metadata to the run."""
385 if self.extra is None:
386 self.extra = {}
387 metadata_: dict = cast(dict, self.extra).setdefault("metadata", {})
388 metadata_.update(metadata)
390 def add_outputs(self, outputs: dict[str, Any]) -> None:
391 """Upsert the given outputs into the run.
393 Args:
394 outputs: A dictionary containing the outputs to be added.
395 """
396 if self.outputs is None:
397 self.outputs = {}
398 self.outputs.update(outputs)
400 def add_inputs(self, inputs: dict[str, Any]) -> None:
401 """Upsert the given inputs into the run.
403 Args:
404 inputs: A dictionary containing the inputs to be added.
405 """
406 if self.inputs is None:
407 self.inputs = {}
408 self.inputs.update(inputs)
409 # Set to False so LangChain things it needs to
410 # re-upload inputs
411 self.extra["inputs_is_truthy"] = False
413 def add_event(
414 self,
415 events: Union[
416 ls_schemas.RunEvent,
417 Sequence[ls_schemas.RunEvent],
418 Sequence[dict],
419 dict,
420 str,
421 ],
422 ) -> None:
423 """Add an event to the list of events.
425 Args:
426 events: The event(s) to be added. It can be a single event, a sequence
427 of events, a sequence of dictionaries, a dictionary, or a string.
429 Returns:
430 None
431 """
432 if self.events is None:
433 self.events = []
434 if isinstance(events, dict):
435 self.events.append(events) # type: ignore[arg-type]
436 elif isinstance(events, str):
437 self.events.append(
438 {
439 "name": "event",
440 "time": datetime.now(timezone.utc).isoformat(),
441 "message": events,
442 }
443 )
444 else:
445 self.events.extend(events) # type: ignore[arg-type]
447 def end(
448 self,
449 *,
450 outputs: Optional[dict] = None,
451 error: Optional[str] = None,
452 end_time: Optional[datetime] = None,
453 events: Optional[Sequence[ls_schemas.RunEvent]] = None,
454 metadata: Optional[dict[str, Any]] = None,
455 ) -> None:
456 """Set the end time of the run and all child runs."""
457 self.end_time = end_time or datetime.now(timezone.utc)
458 # We've already 'set' the outputs, so ignore
459 # the ones that are automatically included
460 if not self.extra.get(OVERRIDE_OUTPUTS):
461 if outputs is not None:
462 if not self.outputs:
463 self.outputs = outputs
464 else:
465 self.outputs.update(outputs)
466 if error is not None:
467 self.error = error
468 if events is not None:
469 self.add_event(events)
470 if metadata is not None:
471 self.add_metadata(metadata)
473 def create_child(
474 self,
475 name: str,
476 run_type: RUN_TYPE_T = "chain",
477 *,
478 run_id: Optional[ID_TYPE] = None,
479 serialized: Optional[dict] = None,
480 inputs: Optional[dict] = None,
481 outputs: Optional[dict] = None,
482 error: Optional[str] = None,
483 reference_example_id: Optional[UUID] = None,
484 start_time: Optional[datetime] = None,
485 end_time: Optional[datetime] = None,
486 tags: Optional[list[str]] = None,
487 extra: Optional[dict] = None,
488 attachments: Optional[ls_schemas.Attachments] = None,
489 ) -> RunTree:
490 """Add a child run to the run tree."""
491 serialized_ = serialized or {"name": name}
492 run = RunTree(
493 name=name,
494 id=_ensure_uuid(run_id),
495 serialized=serialized_,
496 inputs=inputs or {},
497 outputs=outputs or {},
498 error=error,
499 run_type=run_type,
500 reference_example_id=reference_example_id,
501 start_time=start_time or datetime.now(timezone.utc),
502 end_time=end_time,
503 extra=extra or {},
504 parent_run=self,
505 project_name=self.session_name,
506 replicas=self.replicas,
507 ls_client=self.ls_client,
508 tags=tags,
509 attachments=attachments or {}, # type: ignore
510 dangerously_allow_filesystem=self.dangerously_allow_filesystem,
511 )
513 return run
515 def _get_dicts_safe(self):
516 # Things like generators cannot be copied
517 self_dict = self.dict(
518 exclude={"child_runs", "inputs", "outputs"}, exclude_none=True
519 )
520 if self.inputs is not None:
521 # shallow copy. deep copying will occur in the client
522 inputs_ = {}
523 attachments = self_dict.get("attachments", {})
524 for k, v in self.inputs.items():
525 if isinstance(v, ls_schemas.Attachment):
526 attachments[k] = v
527 else:
528 inputs_[k] = v
529 self_dict["inputs"] = inputs_
530 if attachments:
531 self_dict["attachments"] = attachments
532 if self.outputs is not None:
533 # shallow copy; deep copying will occur in the client
534 self_dict["outputs"] = self.outputs.copy()
535 return self_dict
537 def _slice_parent_id(self, parent_id: str, run_dict: dict) -> None:
538 """Slice the parent id from dotted order.
540 Additionally check if the current run is a child of the parent. If so, update
541 the parent_run_id to None, and set the trace id to the new root id after
542 parent_id.
543 """
544 if dotted_order := run_dict.get("dotted_order"):
545 segs = dotted_order.split(".")
546 start_idx = None
547 parent_id = str(parent_id)
548 # TODO(angus): potentially use binary search to find the index
549 for idx, part in enumerate(segs):
550 seg_id = part[-TIMESTAMP_LENGTH:]
551 if str(seg_id) == parent_id:
552 start_idx = idx
553 break
554 if start_idx is not None:
555 # Trim segments to start after parent_id (exclusive)
556 trimmed_segs = segs[start_idx + 1 :]
557 # Rebuild dotted_order
558 run_dict["dotted_order"] = ".".join(trimmed_segs)
559 if trimmed_segs:
560 run_dict["trace_id"] = UUID(trimmed_segs[0][-TIMESTAMP_LENGTH:])
561 else:
562 run_dict["trace_id"] = run_dict["id"]
563 if str(run_dict.get("parent_run_id")) == parent_id:
564 # We've found the new root node.
565 run_dict.pop("parent_run_id", None)
567 def _remap_for_project(
568 self, project_name: str, updates: Optional[dict] = None
569 ) -> dict:
570 """Rewrites ids/dotted_order for a given project with optional updates."""
571 run_dict = self._get_dicts_safe()
572 if project_name == self.session_name:
573 return run_dict
575 if updates and updates.get("reroot", False):
576 distributed_parent_id = _DISTRIBUTED_PARENT_ID.get()
577 if distributed_parent_id:
578 self._slice_parent_id(distributed_parent_id, run_dict)
580 old_id = run_dict["id"]
581 new_id = uuid5(NAMESPACE_DNS, f"{old_id}:{project_name}")
582 # trace id
583 old_trace = run_dict.get("trace_id")
584 if old_trace:
585 new_trace = uuid5(NAMESPACE_DNS, f"{old_trace}:{project_name}")
586 else:
587 new_trace = None
588 # parent id
589 parent = run_dict.get("parent_run_id")
590 if parent:
591 new_parent = uuid5(NAMESPACE_DNS, f"{parent}:{project_name}")
592 else:
593 new_parent = None
594 # dotted order
595 if run_dict.get("dotted_order"):
596 segs = run_dict["dotted_order"].split(".")
597 rebuilt = []
598 for part in segs[:-1]:
599 repl = uuid5(
600 NAMESPACE_DNS, f"{part[-TIMESTAMP_LENGTH:]}:{project_name}"
601 )
602 rebuilt.append(part[:-TIMESTAMP_LENGTH] + str(repl))
603 rebuilt.append(segs[-1][:-TIMESTAMP_LENGTH] + str(new_id))
604 dotted = ".".join(rebuilt)
605 else:
606 dotted = None
607 dup = utils.deepish_copy(run_dict)
608 dup.update(
609 {
610 "id": new_id,
611 "trace_id": new_trace,
612 "parent_run_id": new_parent,
613 "dotted_order": dotted,
614 "session_name": project_name,
615 }
616 )
617 if updates:
618 dup.update(updates)
619 return dup
621 def post(self, exclude_child_runs: bool = True) -> None:
622 """Post the run tree to the API asynchronously."""
623 if self.replicas:
624 for replica in self.replicas:
625 project_name = replica.get("project_name") or self.session_name
626 updates = replica.get("updates")
627 run_dict = self._remap_for_project(project_name, updates)
628 self.client.create_run(
629 **run_dict,
630 api_key=replica.get("api_key"),
631 api_url=replica.get("api_url"),
632 )
633 else:
634 kwargs = self._get_dicts_safe()
635 self.client.create_run(**kwargs)
636 if self.attachments:
637 keys = [str(name) for name in self.attachments]
638 self.events.append(
639 {
640 "name": "uploaded_attachment",
641 "time": datetime.now(timezone.utc).isoformat(),
642 "message": set(keys),
643 }
644 )
645 if not exclude_child_runs:
646 for child_run in self.child_runs:
647 child_run.post(exclude_child_runs=False)
649 def patch(self, *, exclude_inputs: bool = False) -> None:
650 """Patch the run tree to the API in a background thread.
652 Args:
653 exclude_inputs: Whether to exclude inputs from the patch request.
654 """
655 if not self.end_time:
656 self.end()
657 attachments = {
658 a: v for a, v in self.attachments.items() if isinstance(v, tuple)
659 }
660 try:
661 # Avoid loading the same attachment twice
662 if attachments:
663 uploaded = next(
664 (
665 ev
666 for ev in self.events
667 if ev.get("name") == "uploaded_attachment"
668 ),
669 None,
670 )
671 if uploaded:
672 attachments = {
673 a: v
674 for a, v in attachments.items()
675 if a not in uploaded["message"]
676 }
677 except Exception as e:
678 logger.warning(f"Error filtering attachments to upload: {e}")
679 if self.replicas:
680 for replica in self.replicas:
681 project_name = replica.get("project_name") or self.session_name
682 updates = replica.get("updates")
683 run_dict = self._remap_for_project(project_name, updates)
684 self.client.update_run(
685 name=run_dict["name"],
686 run_id=run_dict["id"],
687 run_type=run_dict.get("run_type"),
688 start_time=run_dict.get("start_time"),
689 inputs=None if exclude_inputs else run_dict["inputs"],
690 outputs=run_dict["outputs"],
691 error=run_dict.get("error"),
692 parent_run_id=run_dict.get("parent_run_id"),
693 session_name=run_dict.get("session_name"),
694 reference_example_id=run_dict.get("reference_example_id"),
695 end_time=run_dict.get("end_time"),
696 dotted_order=run_dict.get("dotted_order"),
697 trace_id=run_dict.get("trace_id"),
698 events=run_dict.get("events"),
699 tags=run_dict.get("tags"),
700 extra=run_dict.get("extra"),
701 attachments=attachments,
702 api_key=replica.get("api_key"),
703 api_url=replica.get("api_url"),
704 )
705 else:
706 self.client.update_run(
707 name=self.name,
708 run_id=self.id,
709 run_type=cast(RUN_TYPE_T, self.run_type),
710 start_time=self.start_time,
711 inputs=(
712 None
713 if exclude_inputs
714 else (self.inputs.copy() if self.inputs else None)
715 ),
716 outputs=self.outputs.copy() if self.outputs else None,
717 error=self.error,
718 parent_run_id=self.parent_run_id,
719 session_name=self.session_name,
720 reference_example_id=self.reference_example_id,
721 end_time=self.end_time,
722 dotted_order=self.dotted_order,
723 trace_id=self.trace_id,
724 events=self.events,
725 tags=self.tags,
726 extra=self.extra,
727 attachments=attachments,
728 )
730 def wait(self) -> None:
731 """Wait for all `_futures` to complete."""
732 pass
734 def get_url(self) -> str:
735 """Return the URL of the run."""
736 return self.client.get_run_url(run=self)
738 @classmethod
739 def from_dotted_order(
740 cls,
741 dotted_order: str,
742 **kwargs: Any,
743 ) -> RunTree:
744 """Create a new 'child' span from the provided dotted order.
746 Returns:
747 RunTree: The new span.
748 """
749 headers = {
750 LANGSMITH_DOTTED_ORDER: dotted_order,
751 }
752 return cast(RunTree, cls.from_headers(headers, **kwargs)) # type: ignore[arg-type]
754 @classmethod
755 def from_runnable_config(
756 cls,
757 config: Optional[dict],
758 **kwargs: Any,
759 ) -> Optional[RunTree]:
760 """Create a new 'child' span from the provided runnable config.
762 Requires `langchain` to be installed.
764 Returns:
765 The new span or `None` if no parent span information is found.
766 """
767 try:
768 from langchain_core.callbacks.manager import (
769 AsyncCallbackManager,
770 CallbackManager,
771 )
772 from langchain_core.runnables import RunnableConfig, ensure_config
773 from langchain_core.tracers.langchain import LangChainTracer
774 except ImportError as e:
775 raise ImportError(
776 "RunTree.from_runnable_config requires langchain-core to be installed. "
777 "You can install it with `pip install langchain-core`."
778 ) from e
779 if config is None:
780 config_ = ensure_config(
781 cast(RunnableConfig, config) if isinstance(config, dict) else None
782 )
783 else:
784 config_ = cast(RunnableConfig, config)
786 if (
787 (cb := config_.get("callbacks"))
788 and isinstance(cb, (CallbackManager, AsyncCallbackManager))
789 and cb.parent_run_id
790 and (
791 tracer := next(
792 (t for t in cb.handlers if isinstance(t, LangChainTracer)),
793 None,
794 )
795 )
796 ):
797 if (run := tracer.run_map.get(str(cb.parent_run_id))) and run.dotted_order:
798 dotted_order = run.dotted_order
799 kwargs["run_type"] = run.run_type
800 kwargs["inputs"] = run.inputs
801 kwargs["outputs"] = run.outputs
802 kwargs["start_time"] = run.start_time
803 kwargs["end_time"] = run.end_time
804 kwargs["tags"] = sorted(set(run.tags or [] + kwargs.get("tags", [])))
805 kwargs["name"] = run.name
806 extra_ = kwargs.setdefault("extra", {})
807 metadata_ = extra_.setdefault("metadata", {})
808 metadata_.update(run.metadata)
809 elif hasattr(tracer, "order_map") and cb.parent_run_id in tracer.order_map:
810 dotted_order = tracer.order_map[cb.parent_run_id][1]
811 else:
812 return None
813 kwargs["client"] = tracer.client
814 kwargs["project_name"] = tracer.project_name
815 return RunTree.from_dotted_order(dotted_order, **kwargs)
816 return None
818 @classmethod
819 def from_headers(
820 cls, headers: Mapping[Union[str, bytes], Union[str, bytes]], **kwargs: Any
821 ) -> Optional[RunTree]:
822 """Create a new 'parent' span from the provided headers.
824 Extracts parent span information from the headers and creates a new span.
826 Metadata and tags are extracted from the baggage header.
828 The dotted order and trace id are extracted from the trace header.
830 Returns:
831 The new span or `None` if no parent span information is found.
832 """
833 init_args = kwargs.copy()
835 langsmith_trace = cast(Optional[str], headers.get(LANGSMITH_DOTTED_ORDER))
836 if not langsmith_trace:
837 langsmith_trace_bytes = cast(
838 Optional[bytes], headers.get(LANGSMITH_DOTTED_ORDER_BYTES)
839 )
840 if not langsmith_trace_bytes:
841 return # type: ignore[return-value]
842 langsmith_trace = langsmith_trace_bytes.decode("utf-8")
844 parent_dotted_order = langsmith_trace.strip()
845 parsed_dotted_order = _parse_dotted_order(parent_dotted_order)
846 trace_id = parsed_dotted_order[0][1]
847 init_args["trace_id"] = trace_id
848 init_args["id"] = parsed_dotted_order[-1][1]
849 init_args["dotted_order"] = parent_dotted_order
850 if len(parsed_dotted_order) >= 2:
851 # Has a parent
852 init_args["parent_run_id"] = parsed_dotted_order[-2][1]
853 # All placeholders. We assume the source process
854 # handles the life-cycle of the run.
855 init_args["start_time"] = init_args.get("start_time") or datetime.now(
856 timezone.utc
857 )
858 init_args["run_type"] = init_args.get("run_type") or "chain"
859 init_args["name"] = init_args.get("name") or "parent"
861 baggage = _Baggage.from_headers(headers)
862 if baggage.metadata or baggage.tags:
863 init_args["extra"] = init_args.setdefault("extra", {})
864 init_args["extra"]["metadata"] = init_args["extra"].setdefault(
865 "metadata", {}
866 )
867 metadata = {**baggage.metadata, **init_args["extra"]["metadata"]}
868 init_args["extra"]["metadata"] = metadata
869 tags = sorted(set(baggage.tags + init_args.get("tags", [])))
870 init_args["tags"] = tags
871 if baggage.project_name:
872 init_args["project_name"] = baggage.project_name
873 if baggage.replicas:
874 init_args["replicas"] = baggage.replicas
876 run_tree = RunTree(**init_args)
878 # Set the distributed parent ID to this run's ID for rerooting
879 _DISTRIBUTED_PARENT_ID.set(str(run_tree.id))
881 return run_tree
883 def to_headers(self) -> dict[str, str]:
884 """Return the `RunTree` as a dictionary of headers."""
885 headers = {}
886 if self.trace_id:
887 headers[f"{LANGSMITH_DOTTED_ORDER}"] = self.dotted_order
888 baggage = _Baggage(
889 metadata=self.extra.get("metadata", {}),
890 tags=self.tags,
891 project_name=self.session_name,
892 replicas=self.replicas,
893 )
894 headers["baggage"] = baggage.to_header()
895 return headers
897 def __repr__(self):
898 """Return a string representation of the `RunTree` object."""
899 return (
900 f"RunTree(id={self.id}, name='{self.name}', "
901 f"run_type='{self.run_type}', dotted_order='{self.dotted_order}')"
902 )
905class _Baggage:
906 """Baggage header information."""
908 def __init__(
909 self,
910 metadata: Optional[dict[str, str]] = None,
911 tags: Optional[list[str]] = None,
912 project_name: Optional[str] = None,
913 replicas: Optional[Sequence[WriteReplica]] = None,
914 ):
915 """Initialize the Baggage object."""
916 self.metadata = metadata or {}
917 self.tags = tags or []
918 self.project_name = project_name
919 self.replicas = replicas or []
921 @classmethod
922 def from_header(cls, header_value: Optional[str]) -> _Baggage:
923 """Create a Baggage object from the given header value."""
924 if not header_value:
925 return cls()
926 metadata = {}
927 tags = []
928 project_name = None
929 replicas: Optional[list[WriteReplica]] = None
930 try:
931 for item in header_value.split(","):
932 key, value = item.split("=", 1)
933 if key == LANGSMITH_METADATA:
934 metadata = json.loads(urllib.parse.unquote(value))
935 elif key == LANGSMITH_TAGS:
936 tags = urllib.parse.unquote(value).split(",")
937 elif key == LANGSMITH_PROJECT:
938 project_name = urllib.parse.unquote(value)
939 elif key == LANGSMITH_REPLICAS:
940 replicas_data = json.loads(urllib.parse.unquote(value))
941 parsed_replicas: list[WriteReplica] = []
942 for replica_item in replicas_data:
943 if (
944 isinstance(replica_item, (tuple, list))
945 and len(replica_item) == 2
946 ):
947 # Convert legacy format to WriteReplica
948 parsed_replicas.append(
949 WriteReplica(
950 api_url=None,
951 api_key=None,
952 project_name=str(replica_item[0]),
953 updates=replica_item[1],
954 )
955 )
956 elif isinstance(replica_item, dict):
957 # New WriteReplica format: preserve as dict
958 parsed_replicas.append(cast(WriteReplica, replica_item))
959 else:
960 logger.warning(
961 f"Unknown replica format in baggage: {replica_item}"
962 )
963 continue
964 replicas = parsed_replicas
965 except Exception as e:
966 logger.warning(f"Error parsing baggage header: {e}")
968 return cls(
969 metadata=metadata, tags=tags, project_name=project_name, replicas=replicas
970 )
972 @classmethod
973 def from_headers(cls, headers: Mapping[Union[str, bytes], Any]) -> _Baggage:
974 if "baggage" in headers:
975 return cls.from_header(headers["baggage"])
976 elif b"baggage" in headers:
977 return cls.from_header(cast(bytes, headers[b"baggage"]).decode("utf-8"))
978 else:
979 return cls.from_header(None)
981 def to_header(self) -> str:
982 """Return the Baggage object as a header value."""
983 items = []
984 if self.metadata:
985 serialized_metadata = _dumps_json(self.metadata)
986 items.append(
987 f"{LANGSMITH_PREFIX}metadata={urllib.parse.quote(serialized_metadata)}"
988 )
989 if self.tags:
990 serialized_tags = ",".join(self.tags)
991 items.append(
992 f"{LANGSMITH_PREFIX}tags={urllib.parse.quote(serialized_tags)}"
993 )
994 if self.project_name:
995 items.append(
996 f"{LANGSMITH_PREFIX}project={urllib.parse.quote(self.project_name)}"
997 )
998 if self.replicas:
999 serialized_replicas = _dumps_json(self.replicas)
1000 items.append(
1001 f"{LANGSMITH_PREFIX}replicas={urllib.parse.quote(serialized_replicas)}"
1002 )
1003 return ",".join(items)
1006@functools.lru_cache(maxsize=1)
1007def _parse_write_replicas_from_env_var(env_var: Optional[str]) -> list[WriteReplica]:
1008 """Parse write replicas from LANGSMITH_RUNS_ENDPOINTS environment variable value.
1010 Supports array format [{"api_url": "x", "api_key": "y"}] and object format
1011 {"url": "key"}.
1012 """
1013 if not env_var:
1014 return []
1016 try:
1017 parsed = json.loads(env_var)
1019 if isinstance(parsed, list):
1020 replicas = []
1021 for item in parsed:
1022 if not isinstance(item, dict):
1023 logger.warning(
1024 f"Invalid item type in LANGSMITH_RUNS_ENDPOINTS: "
1025 f"expected dict, got {type(item).__name__}"
1026 )
1027 continue
1029 api_url = item.get("api_url")
1030 api_key = item.get("api_key")
1032 if not isinstance(api_url, str):
1033 logger.warning(
1034 f"Invalid api_url type in LANGSMITH_RUNS_ENDPOINTS: "
1035 f"expected string, got {type(api_url).__name__}"
1036 )
1037 continue
1039 if not isinstance(api_key, str):
1040 logger.warning(
1041 f"Invalid api_key type in LANGSMITH_RUNS_ENDPOINTS: "
1042 f"expected string, got {type(api_key).__name__}"
1043 )
1044 continue
1046 replicas.append(
1047 WriteReplica(
1048 api_url=api_url.rstrip("/"),
1049 api_key=api_key,
1050 project_name=None,
1051 updates=None,
1052 )
1053 )
1054 return replicas
1055 elif isinstance(parsed, dict):
1056 _check_endpoint_env_unset(parsed)
1058 replicas = []
1059 for url, key in parsed.items():
1060 url = url.rstrip("/")
1062 if isinstance(key, str):
1063 replicas.append(
1064 WriteReplica(
1065 api_url=url,
1066 api_key=key,
1067 project_name=None,
1068 updates=None,
1069 )
1070 )
1071 else:
1072 logger.warning(
1073 f"Invalid value type in LANGSMITH_RUNS_ENDPOINTS for URL "
1074 f"{url}: "
1075 f"expected string, got {type(key).__name__}"
1076 )
1077 continue
1078 return replicas
1079 else:
1080 logger.warning(
1081 f"Invalid LANGSMITH_RUNS_ENDPOINTS – must be valid JSON list of "
1082 "objects with api_url and api_key properties, or object mapping "
1083 f"url->apiKey, got {type(parsed).__name__}"
1084 )
1085 return []
1086 except utils.LangSmithUserError:
1087 raise
1088 except Exception as e:
1089 logger.warning(
1090 "Invalid LANGSMITH_RUNS_ENDPOINTS – must be valid JSON list of "
1091 f"objects with api_url and api_key properties, or object mapping"
1092 f" url->apiKey: {e}"
1093 )
1094 return []
1097def _get_write_replicas_from_env() -> list[WriteReplica]:
1098 """Get write replicas from LANGSMITH_RUNS_ENDPOINTS environment variable."""
1099 env_var = utils.get_env_var("RUNS_ENDPOINTS")
1101 return _parse_write_replicas_from_env_var(env_var)
1104def _check_endpoint_env_unset(parsed: dict[str, str]) -> None:
1105 """Check if endpoint environment variables conflict with runs endpoints."""
1106 import os
1108 if parsed and (os.getenv("LANGSMITH_ENDPOINT") or os.getenv("LANGCHAIN_ENDPOINT")):
1109 raise utils.LangSmithUserError(
1110 "You cannot provide both LANGSMITH_ENDPOINT / LANGCHAIN_ENDPOINT "
1111 "and LANGSMITH_RUNS_ENDPOINTS."
1112 )
1115def _ensure_write_replicas(
1116 replicas: Optional[Sequence[WriteReplica]],
1117) -> list[WriteReplica]:
1118 """Convert replicas to WriteReplica format."""
1119 if replicas is None:
1120 return _get_write_replicas_from_env()
1122 # All replicas should now be WriteReplica dicts
1123 return list(replicas)
1126def _parse_dotted_order(dotted_order: str) -> list[tuple[datetime, UUID]]:
1127 """Parse the dotted order string."""
1128 parts = dotted_order.split(".")
1129 return [
1130 (
1131 datetime.strptime(part[:-TIMESTAMP_LENGTH], "%Y%m%dT%H%M%S%fZ"),
1132 UUID(part[-TIMESTAMP_LENGTH:]),
1133 )
1134 for part in parts
1135 ]
1138_CLIENT: Optional[Client] = _context._GLOBAL_CLIENT
1139__all__ = ["RunTree", "RunTree"]
1142def _create_current_dotted_order(
1143 start_time: Optional[datetime], run_id: Optional[UUID]
1144) -> str:
1145 """Create the current dotted order."""
1146 st = start_time or datetime.now(timezone.utc)
1147 id_ = run_id or uuid7_from_datetime(st)
1148 return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_)