Coverage for langsmith/run_helpers.py: 23%
791 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""Decorator for creating a run tree from functions."""
3from __future__ import annotations
5import asyncio
6import contextlib
7import contextvars
8import datetime
9import functools
10import inspect
11import logging
12import warnings
13from collections.abc import (
14 AsyncGenerator,
15 AsyncIterator,
16 Awaitable,
17 Generator,
18 Iterator,
19 Mapping,
20 Sequence,
21)
22from contextvars import copy_context
23from typing import (
24 TYPE_CHECKING,
25 Annotated,
26 Any,
27 Callable,
28 Generic,
29 Literal,
30 Optional,
31 Protocol,
32 TypedDict,
33 TypeVar,
34 Union,
35 cast,
36 get_type_hints,
37 overload,
38 runtime_checkable,
39)
41from typing_extensions import ParamSpec, TypeGuard, get_args, get_origin
43import langsmith._internal._context as _context
44from langsmith import client as ls_client
45from langsmith import run_trees, schemas, utils
46from langsmith._internal import _aiter as aitertools
47from langsmith.env import _runtime_env
48from langsmith.run_trees import WriteReplica
50if TYPE_CHECKING:
51 from types import TracebackType
53 from langchain_core.runnables import Runnable
55LOGGER = logging.getLogger(__name__)
56_CONTEXT_KEYS: dict[str, contextvars.ContextVar] = {
57 "parent": _context._PARENT_RUN_TREE,
58 "project_name": _context._PROJECT_NAME,
59 "tags": _context._TAGS,
60 "metadata": _context._METADATA,
61 "enabled": _context._TRACING_ENABLED,
62 "client": _context._CLIENT,
63 "replicas": run_trees._REPLICAS,
64 "distributed_parent_id": run_trees._DISTRIBUTED_PARENT_ID,
65}
67_EXCLUDED_FRAME_FNAME = "langsmith/run_helpers.py"
69_OTEL_AVAILABLE: Optional[bool] = None
72def get_current_run_tree() -> Optional[run_trees.RunTree]:
73 """Get the current run tree."""
74 return _context._PARENT_RUN_TREE.get()
77def set_run_metadata(**metadata: Any) -> None:
78 """Update metadata on the current run tree."""
79 run_tree = get_current_run_tree()
80 if run_tree is None:
81 LOGGER.warning(
82 "No active run tree found. Call `set_run_metadata` inside a traced run."
83 )
84 else:
85 run_tree.metadata.update(metadata)
86 return
89def get_tracing_context(
90 context: Optional[contextvars.Context] = None,
91) -> dict[str, Any]:
92 """Get the current tracing context."""
93 if context is None:
94 return {
95 "parent": _context._PARENT_RUN_TREE.get(),
96 "project_name": _context._PROJECT_NAME.get(),
97 "tags": _context._TAGS.get(),
98 "metadata": _context._METADATA.get(),
99 "enabled": _context._TRACING_ENABLED.get(),
100 "client": _context._CLIENT.get(),
101 "replicas": run_trees._REPLICAS.get(),
102 "distributed_parent_id": run_trees._DISTRIBUTED_PARENT_ID.get(),
103 }
104 return {k: context.get(v) for k, v in _CONTEXT_KEYS.items()}
107@contextlib.contextmanager
108def tracing_context(
109 *,
110 project_name: Optional[str] = None,
111 tags: Optional[list[str]] = None,
112 metadata: Optional[dict[str, Any]] = None,
113 parent: Optional[Union[run_trees.RunTree, Mapping, str, Literal[False]]] = None,
114 enabled: Optional[Union[bool, Literal["local"]]] = None,
115 client: Optional[ls_client.Client] = None,
116 replicas: Optional[Sequence[WriteReplica]] = None,
117 distributed_parent_id: Optional[str] = None,
118 **kwargs: Any,
119) -> Generator[None, None, None]:
120 """Set the tracing context for a block of code.
122 Args:
123 project_name: The name of the project to log the run to.
124 tags: The tags to add to the run.
125 metadata: The metadata to add to the run.
126 parent: The parent run to use for the context.
128 Can be a Run/`RunTree` object, request headers (for distributed tracing),
129 or the dotted order string.
130 client: The client to use for logging the run to LangSmith.
131 enabled: Whether tracing is enabled.
133 Defaults to `None`, meaning it will use the current context value or environment variables.
134 replicas: A sequence of `WriteReplica` dictionaries to send runs to.
136 Example: `[{"api_url": "https://api.example.com", "api_key": "key", "project_name": "proj"}]`
137 or `[{"project_name": "my_experiment", "updates": {"reference_example_id": None}}]`
138 distributed_parent_id: The distributed parent ID for distributed tracing. Defaults to None.
139 """
140 if kwargs:
141 # warn
142 warnings.warn(
143 f"Unrecognized keyword arguments: {kwargs}.",
144 DeprecationWarning,
145 )
146 current_context = get_tracing_context()
147 parent_run = (
148 _get_parent_run({"parent": parent or kwargs.get("parent_run")})
149 if parent is not False
150 else None
151 )
152 distributed_parent_id_to_use = distributed_parent_id
153 if distributed_parent_id_to_use is None and parent_run is not None:
154 # TODO(angus): decide if we want to merge tags and metadata
155 tags = sorted(set(tags or []) | set(parent_run.tags or []))
156 metadata = {**parent_run.metadata, **(metadata or {})}
157 distributed_parent_id_to_use = parent_run.id # type: ignore[assignment]
158 enabled = enabled if enabled is not None else current_context.get("enabled")
159 _set_tracing_context(
160 {
161 "parent": parent_run,
162 "project_name": project_name,
163 "tags": tags,
164 "metadata": metadata,
165 "enabled": enabled,
166 "client": client,
167 "replicas": replicas,
168 "distributed_parent_id": distributed_parent_id_to_use,
169 }
170 )
171 try:
172 yield
173 finally:
174 _set_tracing_context(current_context)
177# Alias for backwards compatibility
178get_run_tree_context = get_current_run_tree
181def is_traceable_function(func: Any) -> TypeGuard[SupportsLangsmithExtra[P, R]]:
182 """Check if a function is `@traceable` decorated."""
183 return (
184 _is_traceable_function(func)
185 or (isinstance(func, functools.partial) and _is_traceable_function(func.func))
186 or (hasattr(func, "__call__") and _is_traceable_function(func.__call__))
187 )
190def ensure_traceable(
191 func: Callable[P, R],
192 *,
193 name: Optional[str] = None,
194 metadata: Optional[Mapping[str, Any]] = None,
195 tags: Optional[list[str]] = None,
196 client: Optional[ls_client.Client] = None,
197 reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None,
198 project_name: Optional[str] = None,
199 process_inputs: Optional[Callable[[dict], dict]] = None,
200 process_outputs: Optional[Callable[..., dict]] = None,
201 process_chunk: Optional[Callable] = None,
202) -> SupportsLangsmithExtra[P, R]:
203 """Ensure that a function is traceable."""
204 if is_traceable_function(func):
205 return func
206 return traceable(
207 name=name,
208 metadata=metadata,
209 tags=tags,
210 client=client,
211 reduce_fn=reduce_fn,
212 project_name=project_name,
213 process_inputs=process_inputs,
214 process_outputs=process_outputs,
215 process_chunk=process_chunk,
216 )(func)
219def is_async(func: Callable) -> bool:
220 """Inspect function or wrapped function to see if it is async."""
221 return inspect.iscoroutinefunction(func) or (
222 hasattr(func, "__wrapped__") and inspect.iscoroutinefunction(func.__wrapped__)
223 )
226class LangSmithExtra(TypedDict, total=False):
227 """Any additional info to be injected into the run dynamically."""
229 name: Optional[str]
230 """Optional name for the run."""
231 reference_example_id: Optional[ls_client.ID_TYPE]
232 """Optional ID of a reference example."""
233 run_extra: Optional[dict]
234 """Optional additional run information."""
235 parent: Optional[Union[run_trees.RunTree, str, Mapping]]
236 """Optional parent run, can be a RunTree, string, or mapping."""
237 run_tree: Optional[run_trees.RunTree] # TODO: Deprecate
238 """Optional run tree (deprecated)."""
239 project_name: Optional[str]
240 """Optional name of the project."""
241 metadata: Optional[dict[str, Any]]
242 """Optional metadata for the run."""
243 tags: Optional[list[str]]
244 """Optional list of tags for the run."""
245 run_id: Optional[ls_client.ID_TYPE]
246 """Optional ID for the run."""
247 client: Optional[ls_client.Client]
248 """Optional LangSmith client."""
249 # Optional callback function to be called if the run succeeds and before it is sent.
250 _on_success: Optional[Callable[[run_trees.RunTree], None]]
251 on_end: Optional[Callable[[run_trees.RunTree], Any]]
252 """Optional callback function to be called after the run ends and is sent."""
255R = TypeVar("R", covariant=True)
256P = ParamSpec("P")
259@runtime_checkable
260class SupportsLangsmithExtra(Protocol, Generic[P, R]):
261 """Implementations of this Protocol accept an optional langsmith_extra parameter."""
263 def __call__( # type: ignore[valid-type]
264 self,
265 *args: P.args,
266 langsmith_extra: Optional[LangSmithExtra] = None,
267 **kwargs: P.kwargs,
268 ) -> R:
269 """Call the instance when it is called as a function.
271 Args:
272 *args: Variable length argument list.
273 langsmith_extra: Optional dictionary containing additional
274 parameters specific to Langsmith.
275 **kwargs: Arbitrary keyword arguments.
277 Returns:
278 R: The return value of the method.
280 """
281 ...
284def _extract_usage(
285 *,
286 run_tree: run_trees.RunTree,
287 outputs: Optional[dict] = None,
288 **kwargs: Any,
289) -> Optional[schemas.ExtractedUsageMetadata]:
290 from_metadata = (run_tree.metadata or {}).get("usage_metadata")
291 return (outputs or {}).get("usage_metadata") or from_metadata
294@overload
295def traceable(
296 func: Callable[P, R],
297) -> SupportsLangsmithExtra[P, R]: ...
300@overload
301def traceable(
302 run_type: ls_client.RUN_TYPE_T = "chain",
303 *,
304 name: Optional[str] = None,
305 metadata: Optional[Mapping[str, Any]] = None,
306 tags: Optional[list[str]] = None,
307 client: Optional[ls_client.Client] = None,
308 reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None,
309 project_name: Optional[str] = None,
310 process_inputs: Optional[Callable[[dict], dict]] = None,
311 process_outputs: Optional[Callable[..., dict]] = None,
312 process_chunk: Optional[Callable] = None,
313 _invocation_params_fn: Optional[Callable[[dict], dict]] = None,
314 dangerously_allow_filesystem: bool = False,
315) -> Callable[[Callable[P, R]], SupportsLangsmithExtra[P, R]]: ...
318def traceable(
319 *args: Any,
320 **kwargs: Any,
321) -> Union[Callable, Callable[[Callable], Callable]]:
322 """Trace a function with langsmith.
324 Args:
325 run_type: The type of run (span) to create.
327 Examples: `llm`, `chain`, `tool`, `prompt`, `retriever`, etc.
329 Defaults to "chain".
330 name: The name of the run. Defaults to the function name.
331 metadata: The metadata to add to the run. Defaults to `None`.
332 tags: The tags to add to the run. Defaults to `None`.
333 client: The client to use for logging the run to LangSmith. Defaults to
334 `None`, which will use the default client.
335 reduce_fn: A function to reduce the output of the function if the function
336 returns a generator.
338 Defaults to `None`, which means the values will be logged as a list.
340 !!! note
342 If the iterator is never exhausted (e.g. the function returns an
343 infinite generator), this will never be called, and the run itself will
344 be stuck in a pending state.
345 project_name: The name of the project to log the run to.
347 Defaults to `None`, which will use the default project.
348 process_inputs: Custom serialization / processing function for inputs.
350 Defaults to `None`.
351 process_outputs: Custom serialization / processing function for outputs.
353 Defaults to `None`.
354 dangerously_allow_filesystem: Whether to allow filesystem access for attachments.
356 Defaults to `False`.
358 Traces that reference local filepaths will be uploaded to LangSmith.
359 In general, network-hosted applications should not be using this because
360 referenced files are usually on the user's machine, not the host machine.
362 Returns:
363 The decorated function.
365 !!! note
367 Requires that `LANGSMITH_TRACING_V2` be set to 'true' in the environment.
369 Examples:
370 !!! example "Basic usage"
372 ```python
373 @traceable
374 def my_function(x: float, y: float) -> float:
375 return x + y
378 my_function(5, 6)
381 @traceable
382 async def my_async_function(query_params: dict) -> dict:
383 async with httpx.AsyncClient() as http_client:
384 response = await http_client.get(
385 "https://api.example.com/data",
386 params=query_params,
387 )
388 return response.json()
391 asyncio.run(my_async_function({"param": "value"}))
392 ```
394 !!! example "Streaming data with a generator"
396 ```python
397 @traceable
398 def my_generator(n: int) -> Iterable:
399 for i in range(n):
400 yield i
403 for item in my_generator(5):
404 print(item)
405 ```
407 !!! example "Async streaming data"
409 ```python
410 @traceable
411 async def my_async_generator(query_params: dict) -> Iterable:
412 async with httpx.AsyncClient() as http_client:
413 response = await http_client.get(
414 "https://api.example.com/data",
415 params=query_params,
416 )
417 for item in response.json():
418 yield item
421 async def async_code():
422 async for item in my_async_generator({"param": "value"}):
423 print(item)
426 asyncio.run(async_code())
427 ```
429 !!! example "Specifying a run type and name"
431 ```python
432 @traceable(name="CustomName", run_type="tool")
433 def another_function(a: float, b: float) -> float:
434 return a * b
437 another_function(5, 6)
438 ```
440 !!! example "Logging with custom metadata and tags"
442 ```python
443 @traceable(
444 metadata={"version": "1.0", "author": "John Doe"}, tags=["beta", "test"]
445 )
446 def tagged_function(x):
447 return x**2
450 tagged_function(5)
451 ```
453 !!! example "Specifying a custom client and project name"
455 ```python
456 custom_client = Client(api_key="your_api_key")
459 @traceable(client=custom_client, project_name="My Special Project")
460 def project_specific_function(data):
461 return data
464 project_specific_function({"data": "to process"})
465 ```
467 !!! example "Manually passing `langsmith_extra`"
469 ```python
470 @traceable
471 def manual_extra_function(x):
472 return x**2
475 manual_extra_function(5, langsmith_extra={"metadata": {"version": "1.0"}})
476 ```
477 """
478 run_type = cast(
479 ls_client.RUN_TYPE_T,
480 (
481 args[0]
482 if args and isinstance(args[0], str)
483 else (kwargs.pop("run_type", None) or "chain")
484 ),
485 )
486 if run_type not in _VALID_RUN_TYPES:
487 warnings.warn(
488 f"Unrecognized run_type: {run_type}. Must be one of: {_VALID_RUN_TYPES}."
489 f" Did you mean @traceable(name='{run_type}')?"
490 )
491 if len(args) > 1:
492 warnings.warn(
493 "The `traceable()` decorator only accepts one positional argument, "
494 "which should be the run_type. All other arguments should be passed "
495 "as keyword arguments."
496 )
497 if "extra" in kwargs:
498 warnings.warn(
499 "The `extra` keyword argument is deprecated. Please use `metadata` "
500 "instead.",
501 DeprecationWarning,
502 )
503 reduce_fn = kwargs.pop("reduce_fn", None)
504 container_input = _ContainerInput(
505 # TODO: Deprecate raw extra
506 extra_outer=kwargs.pop("extra", None),
507 name=kwargs.pop("name", None),
508 metadata=kwargs.pop("metadata", None),
509 tags=kwargs.pop("tags", None),
510 client=kwargs.pop("client", None),
511 project_name=kwargs.pop("project_name", None),
512 run_type=run_type,
513 process_inputs=kwargs.pop("process_inputs", None),
514 process_chunk=kwargs.pop("process_chunk", None),
515 invocation_params_fn=kwargs.pop("_invocation_params_fn", None),
516 dangerously_allow_filesystem=kwargs.pop("dangerously_allow_filesystem", False),
517 )
518 outputs_processor = kwargs.pop("process_outputs", None)
519 _on_run_end = functools.partial(
520 _handle_container_end,
521 outputs_processor=outputs_processor,
522 )
524 if kwargs:
525 warnings.warn(
526 f"The following keyword arguments are not recognized and will be ignored: "
527 f"{sorted(kwargs.keys())}.",
528 DeprecationWarning,
529 )
531 def decorator(func: Callable):
532 func_sig = inspect.signature(func)
533 func_accepts_parent_run = func_sig.parameters.get("run_tree", None) is not None
534 func_accepts_config = func_sig.parameters.get("config", None) is not None
536 @functools.wraps(func)
537 async def async_wrapper(
538 *args: Any,
539 langsmith_extra: Optional[LangSmithExtra] = None,
540 **kwargs: Any,
541 ) -> Any:
542 """Async version of wrapper function."""
543 if not func_accepts_config:
544 kwargs.pop("config", None)
545 run_container = await aitertools.aio_to_thread(
546 _setup_run,
547 func,
548 container_input=container_input,
549 langsmith_extra=langsmith_extra,
550 args=args,
551 kwargs=kwargs,
552 )
554 try:
555 accepts_context = aitertools.asyncio_accepts_context()
556 if func_accepts_parent_run:
557 kwargs["run_tree"] = run_container["new_run"]
559 otel_context_manager = _maybe_create_otel_context(
560 run_container["new_run"]
561 )
562 if otel_context_manager:
564 async def run_with_otel_context():
565 with otel_context_manager:
566 return await func(*args, **kwargs)
568 if accepts_context:
569 function_result = await asyncio.create_task( # type: ignore[call-arg]
570 run_with_otel_context(), context=run_container["context"]
571 )
572 else:
573 # Python < 3.11
574 with tracing_context(
575 **get_tracing_context(run_container["context"])
576 ):
577 function_result = await run_with_otel_context()
578 else:
579 fr_coro = func(*args, **kwargs)
580 if accepts_context:
581 function_result = await asyncio.create_task( # type: ignore[call-arg]
582 fr_coro, context=run_container["context"]
583 )
584 else:
585 # Python < 3.11
586 with tracing_context(
587 **get_tracing_context(run_container["context"])
588 ):
589 function_result = await fr_coro
590 except BaseException as e:
591 # shield from cancellation, given we're catching all exceptions
592 _cleanup_traceback(e)
593 await asyncio.shield(
594 aitertools.aio_to_thread(_on_run_end, run_container, error=e)
595 )
596 raise
597 await aitertools.aio_to_thread(
598 _on_run_end, run_container, outputs=function_result
599 )
600 return function_result
602 @functools.wraps(func)
603 async def async_generator_wrapper(
604 *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any
605 ) -> AsyncGenerator:
606 if not func_accepts_config:
607 kwargs.pop("config", None)
608 run_container = await aitertools.aio_to_thread(
609 _setup_run,
610 func,
611 container_input=container_input,
612 langsmith_extra=langsmith_extra,
613 args=args,
614 kwargs=kwargs,
615 )
616 results: list[Any] = []
617 try:
618 if func_accepts_parent_run:
619 kwargs["run_tree"] = run_container["new_run"]
620 # TODO: Nesting is ambiguous if a nested traceable function is only
621 # called mid-generation. Need to explicitly accept run_tree to get
622 # around this.
624 otel_context_manager = _maybe_create_otel_context(
625 run_container["new_run"]
626 )
628 async_gen_result = func(*args, **kwargs)
629 # Can't iterate through if it's a coroutine
630 accepts_context = aitertools.asyncio_accepts_context()
631 if inspect.iscoroutine(async_gen_result):
632 if accepts_context:
633 async_gen_result = await asyncio.create_task(
634 async_gen_result, context=run_container["context"]
635 ) # type: ignore
636 else:
637 # Python < 3.11
638 with tracing_context(
639 **get_tracing_context(run_container["context"])
640 ):
641 async_gen_result = await async_gen_result
643 async for item in _process_async_iterator(
644 generator=async_gen_result,
645 run_container=run_container,
646 is_llm_run=(
647 run_container["new_run"].run_type == "llm"
648 if run_container["new_run"]
649 else False
650 ),
651 accepts_context=accepts_context,
652 results=results,
653 process_chunk=container_input.get("process_chunk"),
654 otel_context_manager=otel_context_manager,
655 ):
656 yield item
657 except BaseException as e:
658 _cleanup_traceback(e)
659 await asyncio.shield(
660 aitertools.aio_to_thread(
661 _on_run_end,
662 run_container,
663 error=e,
664 outputs=_get_function_result(results, reduce_fn),
665 )
666 )
667 raise
668 await aitertools.aio_to_thread(
669 _on_run_end,
670 run_container,
671 outputs=_get_function_result(results, reduce_fn),
672 )
674 @functools.wraps(func)
675 def wrapper(
676 *args: Any,
677 langsmith_extra: Optional[LangSmithExtra] = None,
678 **kwargs: Any,
679 ) -> Any:
680 """Create a new run or create_child() if run is passed in kwargs."""
681 if not func_accepts_config:
682 kwargs.pop("config", None)
683 run_container = _setup_run(
684 func,
685 container_input=container_input,
686 langsmith_extra=langsmith_extra,
687 args=args,
688 kwargs=kwargs,
689 )
690 func_accepts_parent_run = (
691 inspect.signature(func).parameters.get("run_tree", None) is not None
692 )
693 try:
694 if func_accepts_parent_run:
695 kwargs["run_tree"] = run_container["new_run"]
697 otel_context_manager = _maybe_create_otel_context(
698 run_container["new_run"]
699 )
700 if otel_context_manager:
702 def run_with_otel_context():
703 with otel_context_manager:
704 return func(*args, **kwargs)
706 function_result = run_container["context"].run(
707 run_with_otel_context
708 )
709 else:
710 function_result = run_container["context"].run(
711 func, *args, **kwargs
712 )
713 except BaseException as e:
714 _cleanup_traceback(e)
715 _on_run_end(run_container, error=e)
716 raise
717 _on_run_end(run_container, outputs=function_result)
718 return function_result
720 @functools.wraps(func)
721 def generator_wrapper(
722 *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any
723 ) -> Any:
724 if not func_accepts_config:
725 kwargs.pop("config", None)
726 run_container = _setup_run(
727 func,
728 container_input=container_input,
729 langsmith_extra=langsmith_extra,
730 args=args,
731 kwargs=kwargs,
732 )
733 func_accepts_parent_run = (
734 inspect.signature(func).parameters.get("run_tree", None) is not None
735 )
736 results: list[Any] = []
737 function_return: Any = None
739 try:
740 if func_accepts_parent_run:
741 kwargs["run_tree"] = run_container["new_run"]
743 generator_result = run_container["context"].run(func, *args, **kwargs)
745 otel_context_manager = _maybe_create_otel_context(
746 run_container["new_run"]
747 )
749 function_return = yield from _process_iterator(
750 generator_result,
751 run_container,
752 is_llm_run=run_type == "llm",
753 results=results,
754 process_chunk=container_input.get("process_chunk"),
755 otel_context_manager=otel_context_manager,
756 )
758 if function_return is not None:
759 results.append(function_return)
761 except BaseException as e:
762 _cleanup_traceback(e)
763 _on_run_end(
764 run_container,
765 error=e,
766 outputs=_get_function_result(results, reduce_fn),
767 )
768 raise
769 _on_run_end(run_container, outputs=_get_function_result(results, reduce_fn))
771 return function_return
773 # "Stream" functions (used in methods like OpenAI/Anthropic's SDKs)
774 # are functions that return iterable responses and should not be
775 # considered complete until the streaming is completed
776 @functools.wraps(func)
777 def stream_wrapper(
778 *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any
779 ) -> Any:
780 if not func_accepts_config:
781 kwargs.pop("config", None)
782 trace_container = _setup_run(
783 func,
784 container_input=container_input,
785 langsmith_extra=langsmith_extra,
786 args=args,
787 kwargs=kwargs,
788 )
790 try:
791 if func_accepts_parent_run:
792 kwargs["run_tree"] = trace_container["new_run"]
793 stream = trace_container["context"].run(func, *args, **kwargs)
794 except Exception as e:
795 _cleanup_traceback(e)
796 _on_run_end(trace_container, error=e)
797 raise
799 if hasattr(stream, "__iter__"):
800 return _TracedStream(stream, trace_container, reduce_fn)
801 elif hasattr(stream, "__aiter__"):
802 # sync function -> async iterable (unexpected)
803 return _TracedAsyncStream(stream, trace_container, reduce_fn)
805 # If it's not iterable, end the trace immediately
806 _on_run_end(trace_container, outputs=stream)
807 return stream
809 @functools.wraps(func)
810 async def async_stream_wrapper(
811 *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any
812 ) -> Any:
813 if not func_accepts_config:
814 kwargs.pop("config", None)
815 trace_container = await aitertools.aio_to_thread(
816 _setup_run,
817 func,
818 container_input=container_input,
819 langsmith_extra=langsmith_extra,
820 args=args,
821 kwargs=kwargs,
822 )
824 try:
825 if func_accepts_parent_run:
826 kwargs["run_tree"] = trace_container["new_run"]
827 stream = await func(*args, **kwargs)
828 except Exception as e:
829 await aitertools.aio_to_thread(_on_run_end, trace_container, error=e)
830 raise
832 if hasattr(stream, "__aiter__"):
833 return _TracedAsyncStream(stream, trace_container, reduce_fn)
834 elif hasattr(stream, "__iter__"):
835 # Async function -> sync iterable
836 return _TracedStream(stream, trace_container, reduce_fn)
838 # If it's not iterable, end the trace immediately
839 await aitertools.aio_to_thread(_on_run_end, trace_container, outputs=stream)
840 return stream
842 if inspect.isasyncgenfunction(func):
843 selected_wrapper: Callable = async_generator_wrapper
844 elif inspect.isgeneratorfunction(func):
845 selected_wrapper = generator_wrapper
846 elif is_async(func):
847 if reduce_fn:
848 selected_wrapper = async_stream_wrapper
849 else:
850 selected_wrapper = async_wrapper
851 else:
852 if reduce_fn:
853 selected_wrapper = stream_wrapper
854 else:
855 selected_wrapper = wrapper
856 setattr(selected_wrapper, "__langsmith_traceable__", True)
857 sig = inspect.signature(selected_wrapper)
858 if not sig.parameters.get("config"):
859 sig = sig.replace(
860 parameters=[
861 *(
862 param
863 for param in sig.parameters.values()
864 if param.kind != inspect.Parameter.VAR_KEYWORD
865 ),
866 inspect.Parameter(
867 "config", inspect.Parameter.KEYWORD_ONLY, default=None
868 ),
869 *(
870 param
871 for param in sig.parameters.values()
872 if param.kind == inspect.Parameter.VAR_KEYWORD
873 ),
874 ]
875 )
876 selected_wrapper.__signature__ = sig # type: ignore[attr-defined]
877 return selected_wrapper
879 # If the decorator is called with no arguments, then it's being used as a
880 # decorator, so we return the decorator function
881 if len(args) == 1 and callable(args[0]) and not kwargs:
882 return decorator(args[0])
883 # Else it's being used as a decorator factory, so we return the decorator
884 return decorator
887class trace:
888 """Manage a LangSmith run in context.
890 This class can be used as both a synchronous and asynchronous context manager.
892 Args:
893 name: Name of the run.
894 run_type: Type of run (e.g., `'chain'`, `'llm'`, `'tool'`).
895 inputs: Initial input data for the run.
896 project_name: Project name to associate the run with.
897 parent: Parent run.
899 Can be a `RunTree`, dotted order string, or tracing headers.
900 tags: List of tags for the run.
901 metadata: Additional metadata for the run.
902 client: LangSmith client for custom settings.
903 run_id: Preset identifier for the run.
904 reference_example_id: Associates run with a dataset example.
906 Only for root runs in evaluation.
907 exceptions_to_handle: Exception types to ignore.
908 extra: Extra data to send to LangSmith.
910 Use 'metadata' instead.
912 Examples:
913 Synchronous usage:
915 ```python
916 with trace("My Operation", run_type="tool", tags=["important"]) as run:
917 result = "foo" # Perform operation
918 run.metadata["some-key"] = "some-value"
919 run.end(outputs={"result": result})
920 ```
922 Asynchronous usage:
924 ```python
925 async def main():
926 async with trace("Async Operation", run_type="tool", tags=["async"]) as run:
927 result = "foo" # Await async operation
928 run.metadata["some-key"] = "some-value"
929 # "end" just adds the outputs and sets error to None
930 # The actual patching of the run happens when the context exits
931 run.end(outputs={"result": result})
934 asyncio.run(main())
935 ```
937 Handling specific exceptions:
939 ```python
940 import pytest
941 import sys
943 with trace("Test", exceptions_to_handle=(pytest.skip.Exception,)):
944 if sys.platform == "win32": # Just an example
945 pytest.skip("Skipping test for windows")
946 result = "foo" # Perform test operation
947 ```
948 """
950 def __init__(
951 self,
952 name: str,
953 run_type: ls_client.RUN_TYPE_T = "chain",
954 *,
955 inputs: Optional[dict] = None,
956 extra: Optional[dict] = None,
957 project_name: Optional[str] = None,
958 parent: Optional[
959 Union[run_trees.RunTree, str, Mapping, Literal["ignore"]]
960 ] = None,
961 tags: Optional[list[str]] = None,
962 metadata: Optional[Mapping[str, Any]] = None,
963 client: Optional[ls_client.Client] = None,
964 run_id: Optional[ls_client.ID_TYPE] = None,
965 reference_example_id: Optional[ls_client.ID_TYPE] = None,
966 exceptions_to_handle: Optional[tuple[type[BaseException], ...]] = None,
967 attachments: Optional[schemas.Attachments] = None,
968 **kwargs: Any,
969 ):
970 """Initialize the trace context manager.
972 Warns if unsupported kwargs are passed.
973 """
974 self._end_on_exit = kwargs.pop("_end_on_exit", True)
975 if kwargs:
976 warnings.warn(
977 "The `trace` context manager no longer supports the following kwargs: "
978 f"{sorted(kwargs.keys())}.",
979 DeprecationWarning,
980 )
981 self.name = name
982 self.run_type = run_type
983 self.inputs = inputs
984 self.attachments = attachments
985 self.extra = extra
986 self.project_name = project_name
987 self.parent = parent
988 # The run tree is deprecated. Keeping for backwards compat.
989 # Will fully merge within parent later.
990 self.run_tree = kwargs.get("run_tree")
991 self.tags = tags
992 self.metadata = metadata
993 self.client = client
994 self.run_id = run_id
995 self.reference_example_id = reference_example_id
996 self.exceptions_to_handle = exceptions_to_handle
997 self.new_run: Optional[run_trees.RunTree] = None
998 self.old_ctx: Optional[dict] = None
1000 def _setup(self) -> run_trees.RunTree:
1001 """Set up the tracing context and create a new run.
1003 This method initializes the tracing context, merges tags and metadata,
1004 creates a new run (either as a child of an existing run or as a new root run),
1005 and sets up the necessary context variables.
1007 Returns:
1008 run_trees.RunTree: The newly created run.
1009 """
1010 self.old_ctx = get_tracing_context()
1011 enabled = utils.tracing_is_enabled(self.old_ctx)
1013 outer_tags = _context._TAGS.get() or _context._GLOBAL_TAGS
1014 outer_metadata = _context._METADATA.get() or _context._GLOBAL_METADATA
1015 client_ = self.client or self.old_ctx.get("client")
1016 parent_run_ = _get_parent_run(
1017 {
1018 "parent": self.parent,
1019 "run_tree": self.run_tree,
1020 "client": client_,
1021 "project_name": self.project_name,
1022 }
1023 )
1025 tags_ = sorted(set((self.tags or []) + (outer_tags or [])))
1026 metadata = {
1027 **(self.metadata or {}),
1028 **(outer_metadata or {}),
1029 "ls_method": "trace",
1030 }
1032 extra_outer = self.extra or {}
1033 extra_outer["metadata"] = metadata
1035 project_name_ = _get_project_name(self.project_name)
1037 if parent_run_ is not None and enabled:
1038 self.new_run = parent_run_.create_child(
1039 name=self.name,
1040 run_id=self.run_id,
1041 run_type=self.run_type,
1042 extra=extra_outer,
1043 inputs=self.inputs,
1044 tags=tags_,
1045 attachments=self.attachments,
1046 )
1047 else:
1048 self.new_run = run_trees.RunTree(
1049 name=self.name,
1050 id=ls_client._ensure_uuid(self.run_id),
1051 reference_example_id=ls_client._ensure_uuid(
1052 self.reference_example_id, accept_null=True
1053 ),
1054 run_type=self.run_type,
1055 extra=extra_outer,
1056 project_name=project_name_ or "default",
1057 replicas=run_trees._REPLICAS.get(),
1058 inputs=self.inputs or {},
1059 tags=tags_,
1060 client=client_, # type: ignore
1061 attachments=self.attachments or {}, # type: ignore
1062 )
1064 if enabled is True:
1065 self.new_run.post()
1066 if enabled:
1067 _context._TAGS.set(tags_)
1068 _context._METADATA.set(metadata)
1069 _context._PARENT_RUN_TREE.set(self.new_run)
1070 _context._PROJECT_NAME.set(project_name_)
1071 _context._CLIENT.set(client_)
1073 return self.new_run
1075 def _teardown(
1076 self,
1077 exc_type: Optional[type[BaseException]],
1078 exc_value: Optional[BaseException],
1079 traceback: Optional[TracebackType],
1080 ) -> None:
1081 """Clean up the tracing context and finalize the run.
1083 This method handles exceptions, ends the run if necessary,
1084 patches the run if it's not disabled, and resets the tracing context.
1086 Args:
1087 exc_type: The type of the exception that occurred, if any.
1088 exc_value: The exception instance that occurred, if any.
1089 traceback: The traceback object associated with the exception, if any.
1090 """
1091 if self.new_run is None:
1092 return
1093 if exc_type is not None:
1094 if self.exceptions_to_handle and issubclass(
1095 exc_type, self.exceptions_to_handle
1096 ):
1097 tb = None
1098 else:
1099 tb = utils._format_exc()
1100 tb = f"{exc_type.__name__}: {exc_value}\n\n{tb}"
1101 self.new_run.end(error=tb)
1102 if self.old_ctx is not None:
1103 enabled = utils.tracing_is_enabled(self.old_ctx)
1104 if enabled is True and self._end_on_exit:
1105 self.new_run.patch()
1107 _set_tracing_context(self.old_ctx)
1108 else:
1109 warnings.warn("Tracing context was not set up properly.", RuntimeWarning)
1111 def __enter__(self) -> run_trees.RunTree:
1112 """Enter the context manager synchronously.
1114 Returns:
1115 run_trees.RunTree: The newly created run.
1116 """
1117 return self._setup()
1119 def __exit__(
1120 self,
1121 exc_type: Optional[type[BaseException]] = None,
1122 exc_value: Optional[BaseException] = None,
1123 traceback: Optional[TracebackType] = None,
1124 ) -> None:
1125 """Exit the context manager synchronously.
1127 Args:
1128 exc_type: The type of the exception that occurred, if any.
1129 exc_value: The exception instance that occurred, if any.
1130 traceback: The traceback object associated with the exception, if any.
1131 """
1132 self._teardown(exc_type, exc_value, traceback)
1134 async def __aenter__(self) -> run_trees.RunTree:
1135 """Enter the context manager asynchronously.
1137 Returns:
1138 run_trees.RunTree: The newly created run.
1139 """
1140 ctx = copy_context()
1141 result = await aitertools.aio_to_thread(self._setup, __ctx=ctx)
1142 # Set the context for the current thread
1143 _set_tracing_context(get_tracing_context(ctx))
1144 return result
1146 async def __aexit__(
1147 self,
1148 exc_type: Optional[type[BaseException]] = None,
1149 exc_value: Optional[BaseException] = None,
1150 traceback: Optional[TracebackType] = None,
1151 ) -> None:
1152 """Exit the context manager asynchronously.
1154 Args:
1155 exc_type: The type of the exception that occurred, if any.
1156 exc_value: The exception instance that occurred, if any.
1157 traceback: The traceback object associated with the exception, if any.
1158 """
1159 ctx = copy_context()
1160 if exc_type is not None:
1161 await asyncio.shield(
1162 aitertools.aio_to_thread(
1163 self._teardown, exc_type, exc_value, traceback, __ctx=ctx
1164 )
1165 )
1166 else:
1167 await aitertools.aio_to_thread(
1168 self._teardown, exc_type, exc_value, traceback, __ctx=ctx
1169 )
1170 _set_tracing_context(get_tracing_context(ctx))
1173def _get_project_name(project_name: Optional[str]) -> Optional[str]:
1174 if project_name:
1175 return project_name
1176 prt = _PARENT_RUN_TREE.get()
1177 return (
1178 # Maintain tree consistency first
1179 _context._PROJECT_NAME.get()
1180 or (prt.session_name if prt else None)
1181 # Global fallback configured via ls.configure(...)
1182 or _context._GLOBAL_PROJECT_NAME
1183 # fallback to the default for the environment
1184 or utils.get_tracer_project()
1185 )
1188def as_runnable(traceable_fn: Callable) -> Runnable:
1189 """Convert a function wrapped by the LangSmith `@traceable` decorator to a `Runnable`.
1191 Args:
1192 traceable_fn: The function wrapped by the `@traceable` decorator.
1194 Returns:
1195 Runnable: A `Runnable` object that maintains a consistent LangSmith
1196 tracing context.
1198 Raises:
1199 ImportError: If `langchain` module is not installed.
1200 ValueError: If the provided function is not wrapped by the `@traceable` decorator.
1202 Example:
1203 >>> @traceable
1204 ... def my_function(input_data):
1205 ... # Function implementation
1206 ... pass
1207 >>> runnable = as_runnable(my_function)
1208 """
1209 try:
1210 from langchain_core.runnables import RunnableConfig, RunnableLambda
1211 from langchain_core.runnables.utils import Input, Output
1212 except ImportError as e:
1213 raise ImportError(
1214 "as_runnable requires langchain-core to be installed. "
1215 "You can install it with `pip install langchain-core`."
1216 ) from e
1217 if not is_traceable_function(traceable_fn):
1218 try:
1219 fn_src = inspect.getsource(traceable_fn)
1220 except Exception:
1221 fn_src = "<source unavailable>"
1222 raise ValueError(
1223 f"as_runnable expects a function wrapped by the LangSmith"
1224 f" @traceable decorator. Got {traceable_fn} defined as:\n{fn_src}"
1225 )
1227 class RunnableTraceable(RunnableLambda):
1228 """Converts a `@traceable` decorated function to a `Runnable`.
1230 This helps maintain a consistent LangSmith tracing context.
1231 """
1233 def __init__(
1234 self,
1235 func: Callable,
1236 afunc: Optional[Callable[..., Awaitable[Output]]] = None,
1237 ) -> None:
1238 wrapped: Optional[Callable[[Input], Output]] = None
1239 awrapped = self._wrap_async(afunc)
1240 if is_async(func):
1241 if awrapped is not None:
1242 raise TypeError(
1243 "Func was provided as a coroutine function, but afunc was "
1244 "also provided. If providing both, func should be a regular "
1245 "function to avoid ambiguity."
1246 )
1247 wrapped = cast(Callable[[Input], Output], self._wrap_async(func))
1248 elif is_traceable_function(func):
1249 wrapped = cast(Callable[[Input], Output], self._wrap_sync(func))
1250 if wrapped is None:
1251 raise ValueError(
1252 f"{self.__class__.__name__} expects a function wrapped by"
1253 " the LangSmith"
1254 f" @traceable decorator. Got {func}"
1255 )
1257 super().__init__(
1258 wrapped,
1259 cast(
1260 Optional[Callable[[Input], Awaitable[Output]]],
1261 awrapped,
1262 ),
1263 )
1265 @staticmethod
1266 def _wrap_sync(
1267 func: Callable[..., Output],
1268 ) -> Callable[[Input, RunnableConfig], Output]:
1269 """Wrap a synchronous function to make it asynchronous."""
1271 def wrap_traceable(inputs: dict, config: RunnableConfig) -> Any:
1272 run_tree = run_trees.RunTree.from_runnable_config(cast(dict, config))
1273 return func(**inputs, langsmith_extra={"run_tree": run_tree})
1275 return cast(Callable[[Input, RunnableConfig], Output], wrap_traceable)
1277 @staticmethod
1278 def _wrap_async(
1279 afunc: Optional[Callable[..., Awaitable[Output]]],
1280 ) -> Optional[Callable[[Input, RunnableConfig], Awaitable[Output]]]:
1281 """Wrap an async function to make it synchronous."""
1282 if afunc is None:
1283 return None
1285 if not is_traceable_function(afunc):
1286 raise ValueError(
1287 "RunnableTraceable expects a function wrapped by the LangSmith"
1288 f" @traceable decorator. Got {afunc}"
1289 )
1290 afunc_ = cast(Callable[..., Awaitable[Output]], afunc)
1292 async def awrap_traceable(inputs: dict, config: RunnableConfig) -> Any:
1293 run_tree = run_trees.RunTree.from_runnable_config(cast(dict, config))
1294 return await afunc_(**inputs, langsmith_extra={"run_tree": run_tree})
1296 return cast(
1297 Callable[[Input, RunnableConfig], Awaitable[Output]], awrap_traceable
1298 )
1300 return RunnableTraceable(traceable_fn)
1303## Private Methods and Objects
1304_VALID_RUN_TYPES = {
1305 "tool",
1306 "chain",
1307 "llm",
1308 "retriever",
1309 "embedding",
1310 "prompt",
1311 "parser",
1312}
1315class _TraceableContainer(TypedDict, total=False):
1316 """Typed response when initializing a run a traceable."""
1318 new_run: Optional[run_trees.RunTree]
1319 project_name: Optional[str]
1320 outer_project: Optional[str]
1321 outer_metadata: Optional[dict[str, Any]]
1322 outer_tags: Optional[list[str]]
1323 _on_success: Optional[Callable[[run_trees.RunTree], Any]]
1324 on_end: Optional[Callable[[run_trees.RunTree], Any]]
1325 context: contextvars.Context
1326 _token_event_logged: Optional[bool]
1329class _ContainerInput(TypedDict, total=False):
1330 """Typed response when initializing a run a traceable."""
1332 extra_outer: Optional[dict]
1333 name: Optional[str]
1334 metadata: Optional[dict[str, Any]]
1335 tags: Optional[list[str]]
1336 client: Optional[ls_client.Client]
1337 reduce_fn: Optional[Callable]
1338 project_name: Optional[str]
1339 run_type: ls_client.RUN_TYPE_T
1340 process_inputs: Optional[Callable[[dict], dict]]
1341 process_chunk: Optional[Callable]
1342 invocation_params_fn: Optional[Callable[[dict], dict]]
1343 dangerously_allow_filesystem: Optional[bool]
1346def _container_end(
1347 container: _TraceableContainer,
1348 outputs: Optional[Any] = None,
1349 error: Optional[BaseException] = None,
1350) -> None:
1351 """End the run."""
1352 run_tree = container.get("new_run")
1353 if run_tree is None:
1354 # Tracing not enabled
1355 return
1356 if isinstance(outputs, dict):
1357 dict_outputs = outputs
1358 elif (
1359 outputs is not None
1360 and hasattr(outputs, "model_dump")
1361 and callable(outputs.model_dump)
1362 and not isinstance(outputs, type)
1363 ):
1364 try:
1365 dict_outputs = outputs.model_dump(exclude_none=True, mode="json")
1366 except Exception as e:
1367 LOGGER.debug(
1368 f"Failed to use model_dump to serialize {type(outputs)} to JSON: {e}"
1369 )
1370 dict_outputs = {"output": outputs}
1371 else:
1372 dict_outputs = {"output": outputs}
1373 if (usage := _extract_usage(run_tree=run_tree, outputs=dict_outputs)) is not None:
1374 run_tree.metadata["usage_metadata"] = usage
1375 if error:
1376 stacktrace = utils._format_exc()
1377 error_repr = f"{repr(error)}\n\n{stacktrace}"
1378 else:
1379 error_repr = None
1380 if (_on_success := container.get("_on_success")) and callable(_on_success):
1381 try:
1382 _on_success(run_tree)
1383 except BaseException as e:
1384 warnings.warn(f"Failed to run _on_success function: {e}")
1386 run_tree.end(outputs=dict_outputs, error=error_repr)
1387 if utils.tracing_is_enabled() is True:
1388 run_tree.patch()
1389 if (on_end := container.get("on_end")) and callable(on_end):
1390 try:
1391 on_end(run_tree)
1392 except BaseException as e:
1393 warnings.warn(f"Failed to run on_end function: {e}")
1396def _collect_extra(extra_outer: dict, langsmith_extra: LangSmithExtra) -> dict:
1397 run_extra = langsmith_extra.get("run_extra", None)
1398 if run_extra:
1399 extra_inner = {**extra_outer, **run_extra}
1400 else:
1401 extra_inner = extra_outer
1402 return extra_inner
1405def _get_parent_run(
1406 langsmith_extra: LangSmithExtra,
1407 config: Optional[dict] = None,
1408) -> Optional[run_trees.RunTree]:
1409 parent = langsmith_extra.get("parent")
1410 if parent == "ignore":
1411 return None
1412 if isinstance(parent, run_trees.RunTree):
1413 return parent
1414 if isinstance(parent, Mapping):
1415 return run_trees.RunTree.from_headers(
1416 parent,
1417 client=langsmith_extra.get("client"),
1418 # Precedence: headers -> cvar -> explicit -> env var
1419 project_name=_get_project_name(langsmith_extra.get("project_name")),
1420 )
1421 if isinstance(parent, str):
1422 dort = run_trees.RunTree.from_dotted_order(
1423 parent,
1424 client=langsmith_extra.get("client"),
1425 # Precedence: cvar -> explicit -> env var
1426 project_name=_get_project_name(langsmith_extra.get("project_name")),
1427 )
1428 return dort
1429 run_tree = langsmith_extra.get("run_tree")
1430 if run_tree:
1431 return run_tree
1432 crt = get_current_run_tree()
1433 if _runtime_env.get_langchain_core_version() is not None:
1434 if rt := run_trees.RunTree.from_runnable_config(
1435 config, client=langsmith_extra.get("client")
1436 ):
1437 # Still need to break ties when alternating between traceable and
1438 # LanChain code.
1439 # Nesting: LC -> LS -> LS, we want to still use LS as the parent
1440 # Otherwise would look like LC -> {LS, LS} (siblings)
1441 if (
1442 not crt # Simple LC -> LS
1443 # Let user override if manually passed in or invoked in a
1444 # RunnableSequence. This is a naive check.
1445 or (config is not None and config.get("callbacks"))
1446 # If the LangChain dotted order is more nested than the LangSmith
1447 # dotted order, use the LangChain run as the parent.
1448 # Note that this condition shouldn't be triggered in later
1449 # versions of core, since we also update the run_tree context
1450 # vars when updating the RunnableConfig context var.
1451 or rt.dotted_order > crt.dotted_order
1452 ):
1453 return rt
1454 return crt
1457def _setup_run(
1458 func: Callable,
1459 container_input: _ContainerInput,
1460 langsmith_extra: Optional[LangSmithExtra] = None,
1461 args: Any = None,
1462 kwargs: Any = None,
1463) -> _TraceableContainer:
1464 """Create a new run or create_child() if run is passed in kwargs."""
1465 extra_outer = container_input.get("extra_outer") or {}
1466 metadata = container_input.get("metadata")
1467 tags = container_input.get("tags")
1468 client = container_input.get("client")
1469 run_type = container_input.get("run_type") or "chain"
1470 dangerously_allow_filesystem = container_input.get(
1471 "dangerously_allow_filesystem", False
1472 )
1473 outer_project = _context._PROJECT_NAME.get()
1474 langsmith_extra = langsmith_extra or LangSmithExtra()
1475 name = langsmith_extra.get("name") or container_input.get("name")
1476 client_ = langsmith_extra.get("client", client) or _context._CLIENT.get()
1477 parent_run_ = _get_parent_run(
1478 {**langsmith_extra, "client": client_}, kwargs.get("config")
1479 )
1480 project_cv = _context._PROJECT_NAME.get()
1481 selected_project = (
1482 project_cv # From parent trace
1483 or (
1484 parent_run_.session_name if parent_run_ else None
1485 ) # from parent run attempt 2 (not managed by traceable)
1486 or langsmith_extra.get("project_name") # at invocation time
1487 or container_input["project_name"] # at decorator time
1488 or _context._GLOBAL_PROJECT_NAME # global fallback from ls.configure
1489 or utils.get_tracer_project() # default
1490 )
1491 reference_example_id = langsmith_extra.get("reference_example_id")
1492 id_ = langsmith_extra.get("run_id")
1493 if not parent_run_ and not utils.tracing_is_enabled():
1494 utils.log_once(
1495 logging.DEBUG,
1496 "LangSmith tracing is not enabled, returning original function.",
1497 )
1498 return _TraceableContainer(
1499 new_run=None,
1500 project_name=selected_project,
1501 outer_project=outer_project,
1502 outer_metadata=None,
1503 outer_tags=None,
1504 _on_success=langsmith_extra.get("_on_success"),
1505 on_end=langsmith_extra.get("on_end"),
1506 context=copy_context(),
1507 _token_event_logged=False,
1508 )
1509 signature = inspect.signature(func)
1510 name_ = name or utils._get_function_name(func)
1511 extra_inner = _collect_extra(extra_outer, langsmith_extra)
1512 outer_metadata = _context._METADATA.get() or _context._GLOBAL_METADATA
1513 outer_tags = _context._TAGS.get() or _context._GLOBAL_TAGS
1514 context = copy_context()
1515 metadata_ = {
1516 **(langsmith_extra.get("metadata") or {}),
1517 **(outer_metadata or {}),
1518 }
1519 context.run(_context._METADATA.set, metadata_)
1520 metadata_.update(metadata or {})
1521 metadata_["ls_method"] = "traceable"
1522 extra_inner["metadata"] = metadata_
1523 inputs, attachments = _get_inputs_and_attachments_safe(
1524 signature, *args, func=func, **kwargs
1525 )
1526 invocation_params_fn = container_input.get("invocation_params_fn")
1527 if invocation_params_fn:
1528 try:
1529 invocation_params = {
1530 k: v for k, v in invocation_params_fn(inputs).items() if v is not None
1531 }
1532 if invocation_params and isinstance(invocation_params, dict):
1533 metadata_.update(invocation_params)
1534 except BaseException as e:
1535 LOGGER.error(f"Failed to infer invocation params for {name_}: {e}")
1536 process_inputs = container_input.get("process_inputs")
1537 if process_inputs:
1538 try:
1539 inputs = process_inputs(inputs)
1540 except BaseException as e:
1541 LOGGER.error(f"Failed to filter inputs for {name_}: {e}")
1542 tags_ = (langsmith_extra.get("tags") or []) + (outer_tags or [])
1543 context.run(_context._TAGS.set, tags_)
1544 tags_ += tags or []
1545 if parent_run_ is not None:
1546 new_run = parent_run_.create_child(
1547 name=name_,
1548 run_type=run_type,
1549 inputs=inputs,
1550 tags=tags_,
1551 extra=extra_inner,
1552 run_id=id_,
1553 attachments=attachments,
1554 )
1555 else:
1556 # Create RunTree kwargs conditionally to let RunTree generate id from start_time
1557 run_tree_kwargs = {
1558 "name": name_,
1559 "inputs": inputs,
1560 "run_type": run_type,
1561 "reference_example_id": ls_client._ensure_uuid(
1562 reference_example_id, accept_null=True
1563 ),
1564 "project_name": selected_project,
1565 "replicas": run_trees._REPLICAS.get(),
1566 "extra": extra_inner,
1567 "tags": tags_,
1568 "client": client_,
1569 "attachments": attachments,
1570 "dangerously_allow_filesystem": dangerously_allow_filesystem,
1571 }
1572 # Only pass id if user explicitly provided one
1573 if id_ is not None:
1574 run_tree_kwargs["id"] = ls_client._ensure_uuid(id_)
1575 new_run = run_trees.RunTree(**cast(Any, run_tree_kwargs))
1576 if utils.tracing_is_enabled() is True:
1577 try:
1578 new_run.post()
1579 except BaseException as e:
1580 LOGGER.error(f"Failed to post run {new_run.id}: {e}")
1581 response_container = _TraceableContainer(
1582 new_run=new_run,
1583 project_name=selected_project,
1584 outer_project=outer_project,
1585 outer_metadata=outer_metadata,
1586 outer_tags=outer_tags,
1587 on_end=langsmith_extra.get("on_end"),
1588 _on_success=langsmith_extra.get("_on_success"),
1589 context=context,
1590 _token_event_logged=False,
1591 )
1592 context.run(_context._PROJECT_NAME.set, response_container["project_name"])
1593 context.run(_PARENT_RUN_TREE.set, response_container["new_run"])
1594 return response_container
1597def _handle_container_end(
1598 container: _TraceableContainer,
1599 outputs: Optional[Any] = None,
1600 error: Optional[BaseException] = None,
1601 outputs_processor: Optional[Callable[..., dict]] = None,
1602) -> None:
1603 """Handle the end of run."""
1604 try:
1605 if outputs_processor is not None:
1606 outputs = outputs_processor(outputs)
1607 _container_end(container, outputs=outputs, error=error)
1608 except BaseException as e:
1609 LOGGER.warning(f"Unable to process trace outputs: {repr(e)}")
1612def _is_traceable_function(func: Any) -> bool:
1613 return getattr(func, "__langsmith_traceable__", False)
1616def _get_inputs(
1617 signature: inspect.Signature, *args: Any, **kwargs: Any
1618) -> dict[str, Any]:
1619 """Return a dictionary of inputs from the function signature."""
1620 bound = signature.bind_partial(*args, **kwargs)
1621 bound.apply_defaults()
1622 arguments = dict(bound.arguments)
1623 arguments.pop("self", None)
1624 arguments.pop("cls", None)
1625 for param_name, param in signature.parameters.items():
1626 if param.kind == inspect.Parameter.VAR_KEYWORD:
1627 # Update with the **kwargs, and remove the original entry
1628 # This is to help flatten out keyword arguments
1629 if param_name in arguments:
1630 arguments.update(arguments[param_name])
1631 arguments.pop(param_name)
1633 return arguments
1636def _get_inputs_safe(
1637 signature: inspect.Signature, *args: Any, **kwargs: Any
1638) -> dict[str, Any]:
1639 try:
1640 return _get_inputs(signature, *args, **kwargs)
1641 except BaseException as e:
1642 LOGGER.debug(f"Failed to get inputs for {signature}: {e}")
1643 return {"args": args, "kwargs": kwargs}
1646def _is_attachment(param: inspect.Parameter, func: Optional[Callable] = None) -> bool:
1647 if param.annotation == schemas.Attachment or (
1648 get_origin(param.annotation) == Annotated
1649 and any(arg == schemas.Attachment for arg in get_args(param.annotation))
1650 ):
1651 return True
1653 # try resolving stringified annotations
1654 if func is not None and isinstance(param.annotation, str):
1655 try:
1656 # include_extras=True preserves annotated metadata
1657 type_hints = get_type_hints(func, include_extras=True)
1658 resolved_annotation = type_hints.get(param.name)
1659 if resolved_annotation is not None:
1660 return resolved_annotation == schemas.Attachment or (
1661 get_origin(resolved_annotation) == Annotated
1662 and any(
1663 arg == schemas.Attachment
1664 for arg in get_args(resolved_annotation)
1665 )
1666 )
1667 except (NameError, TypeError, AttributeError):
1668 pass
1670 return False
1673def _attachment_args_helper(
1674 signature: inspect.Signature, func: Optional[Callable] = None
1675) -> set[str]:
1676 return {
1677 name
1678 for name, param in signature.parameters.items()
1679 if _is_attachment(param, func)
1680 }
1683@functools.lru_cache(maxsize=1000)
1684def _cached_attachment_args(
1685 signature: inspect.Signature, func: Optional[Callable] = None
1686) -> set[str]:
1687 return _attachment_args_helper(signature, func)
1690def _attachment_args(
1691 signature: inspect.Signature, func: Optional[Callable] = None
1692) -> set[str]:
1693 # Caching signatures fails if there's unhashable default values.
1694 try:
1695 return _cached_attachment_args(signature, func)
1696 except TypeError:
1697 return _attachment_args_helper(signature, func)
1700def _get_inputs_and_attachments_safe(
1701 signature: inspect.Signature,
1702 *args: Any,
1703 func: Optional[Callable] = None,
1704 **kwargs: Any,
1705) -> tuple[dict, schemas.Attachments]:
1706 try:
1707 inferred = _get_inputs(signature, *args, **kwargs)
1708 attachment_args = _attachment_args(signature, func)
1709 if attachment_args:
1710 inputs, attachments = {}, {}
1711 for k, v in inferred.items():
1712 if k in attachment_args:
1713 attachments[k] = v
1714 else:
1715 inputs[k] = v
1716 return inputs, attachments
1717 return inferred, {}
1718 except BaseException as e:
1719 LOGGER.warning(f"Failed to get inputs for {signature}: {e}")
1720 return {"args": args, "kwargs": kwargs}, {}
1723def _set_tracing_context(context: Optional[dict[str, Any]] = None):
1724 """Set the tracing context."""
1725 if context is None:
1726 for k, v in _CONTEXT_KEYS.items():
1727 v.set(None)
1728 return
1729 for k, v in context.items():
1730 var = _CONTEXT_KEYS[k]
1731 var.set(v)
1734def _process_iterator(
1735 generator: Iterator[T],
1736 run_container: _TraceableContainer,
1737 is_llm_run: bool,
1738 # Results is mutated
1739 results: list[Any],
1740 process_chunk: Optional[Callable],
1741 otel_context_manager: Optional[Any] = None,
1742) -> Generator[T, None, Any]:
1743 try:
1744 while True:
1745 if otel_context_manager:
1746 # Create a fresh context manager for each iteration
1747 def next_with_otel_context():
1748 # Get the run_tree from run_container to create a fresh context
1749 run_tree = run_container.get("new_run")
1750 if run_tree:
1751 fresh_otel_context = _maybe_create_otel_context(run_tree)
1752 if fresh_otel_context:
1753 with fresh_otel_context:
1754 return next(generator)
1755 return next(generator)
1757 item: T = run_container["context"].run(next_with_otel_context) # type: ignore[arg-type]
1758 else:
1759 item = run_container["context"].run(next, generator) # type: ignore[arg-type]
1760 if process_chunk:
1761 traced_item = process_chunk(item)
1762 else:
1763 traced_item = item
1764 if (
1765 is_llm_run
1766 and run_container["new_run"]
1767 and not run_container.get("_token_event_logged")
1768 ):
1769 run_container["new_run"].add_event(
1770 {
1771 "name": "new_token",
1772 "time": datetime.datetime.now(
1773 datetime.timezone.utc
1774 ).isoformat(),
1775 "kwargs": {"token": traced_item},
1776 }
1777 )
1778 run_container["_token_event_logged"] = True
1779 results.append(traced_item)
1780 yield item
1781 except StopIteration as e:
1782 return e.value
1785async def _process_async_iterator(
1786 generator: AsyncIterator[T],
1787 run_container: _TraceableContainer,
1788 *,
1789 is_llm_run: bool,
1790 accepts_context: bool,
1791 results: list[Any],
1792 process_chunk: Optional[Callable],
1793 otel_context_manager: Optional[Any] = None,
1794) -> AsyncGenerator[T, None]:
1795 try:
1796 while True:
1797 if otel_context_manager:
1798 # Create a fresh context manager for each iteration
1799 async def anext_with_otel_context():
1800 # Get the run_tree from run_container to create a fresh context
1801 run_tree = run_container.get("new_run")
1802 if run_tree:
1803 fresh_otel_context = _maybe_create_otel_context(run_tree)
1804 if fresh_otel_context:
1805 with fresh_otel_context:
1806 return await aitertools.py_anext(generator)
1807 return await aitertools.py_anext(generator)
1809 if accepts_context:
1810 item = await asyncio.create_task( # type: ignore[call-arg, var-annotated]
1811 anext_with_otel_context(),
1812 context=run_container["context"],
1813 )
1814 else:
1815 # Python < 3.11
1816 with tracing_context(
1817 **get_tracing_context(run_container["context"])
1818 ):
1819 item = await anext_with_otel_context()
1820 else:
1821 if accepts_context:
1822 item = await asyncio.create_task( # type: ignore[call-arg, var-annotated]
1823 aitertools.py_anext(generator), # type: ignore[arg-type]
1824 context=run_container["context"],
1825 )
1826 else:
1827 # Python < 3.11
1828 with tracing_context(
1829 **get_tracing_context(run_container["context"])
1830 ):
1831 item = await aitertools.py_anext(generator)
1832 if process_chunk:
1833 traced_item = process_chunk(item)
1834 else:
1835 traced_item = item
1836 if (
1837 is_llm_run
1838 and run_container["new_run"]
1839 and not run_container.get("_token_event_logged")
1840 ):
1841 run_container["new_run"].add_event(
1842 {
1843 "name": "new_token",
1844 "time": datetime.datetime.now(
1845 datetime.timezone.utc
1846 ).isoformat(),
1847 "kwargs": {"token": traced_item},
1848 }
1849 )
1850 run_container["_token_event_logged"] = True
1851 results.append(traced_item)
1852 yield item
1853 except StopAsyncIteration:
1854 pass
1857T = TypeVar("T")
1860class _TracedStreamBase(Generic[T]):
1861 """Base class for traced stream objects."""
1863 def __init__(
1864 self,
1865 stream: Union[Iterator[T], AsyncIterator[T]],
1866 trace_container: _TraceableContainer,
1867 reduce_fn: Optional[Callable] = None,
1868 ):
1869 self.__ls_stream__ = stream
1870 self.__ls_trace_container__ = trace_container
1871 self.__ls_completed__ = False
1872 self.__ls_reduce_fn__ = reduce_fn
1873 self.__ls_accumulated_output__: list[T] = []
1874 self.__is_llm_run__ = (
1875 trace_container["new_run"].run_type == "llm"
1876 if trace_container["new_run"]
1877 else False
1878 )
1880 def __getattr__(self, name: str):
1881 return getattr(self.__ls_stream__, name)
1883 def __dir__(self):
1884 return list(set(dir(self.__class__) + dir(self.__ls_stream__)))
1886 def __repr__(self):
1887 return f"Traceable({self.__ls_stream__!r})"
1889 def __str__(self):
1890 return str(self.__ls_stream__)
1892 def __del__(self):
1893 try:
1894 if not self.__ls_completed__:
1895 self._end_trace()
1896 except BaseException:
1897 pass
1898 try:
1899 self.__ls_stream__.__del__()
1900 except BaseException:
1901 pass
1903 def _end_trace(self, error: Optional[BaseException] = None):
1904 if self.__ls_completed__:
1905 return
1906 try:
1907 if self.__ls_reduce_fn__:
1908 reduced_output = self.__ls_reduce_fn__(self.__ls_accumulated_output__)
1909 else:
1910 reduced_output = self.__ls_accumulated_output__
1911 _container_end(
1912 self.__ls_trace_container__,
1913 outputs=reduced_output,
1914 error=error,
1915 )
1916 finally:
1917 self.__ls_completed__ = True
1920class _TracedStream(_TracedStreamBase, Generic[T]):
1921 """A wrapper for synchronous stream objects that handles tracing."""
1923 def __init__(
1924 self,
1925 stream: Iterator[T],
1926 trace_container: _TraceableContainer,
1927 reduce_fn: Optional[Callable] = None,
1928 process_chunk: Optional[Callable] = None,
1929 ):
1930 super().__init__(
1931 stream=stream,
1932 trace_container=trace_container,
1933 reduce_fn=reduce_fn,
1934 )
1935 self.__ls_stream__ = stream
1936 self.__ls__gen__ = _process_iterator(
1937 self.__ls_stream__,
1938 self.__ls_trace_container__,
1939 is_llm_run=self.__is_llm_run__,
1940 results=self.__ls_accumulated_output__,
1941 process_chunk=process_chunk,
1942 )
1944 def __next__(self) -> T:
1945 try:
1946 return next(self.__ls__gen__)
1947 except StopIteration:
1948 self._end_trace()
1949 raise
1951 def __iter__(self) -> Iterator[T]:
1952 try:
1953 yield from self.__ls__gen__
1954 except BaseException as e:
1955 _cleanup_traceback(e)
1956 self._end_trace(error=e)
1957 raise
1958 else:
1959 self._end_trace()
1961 def __enter__(self):
1962 self.__ls_stream__.__enter__()
1963 return self
1965 def __exit__(self, exc_type, exc_val, exc_tb):
1966 try:
1967 return self.__ls_stream__.__exit__(exc_type, exc_val, exc_tb)
1968 finally:
1969 self._end_trace(error=exc_val if exc_type else None)
1972class _TracedAsyncStream(_TracedStreamBase, Generic[T]):
1973 """A wrapper for asynchronous stream objects that handles tracing."""
1975 def __init__(
1976 self,
1977 stream: AsyncIterator[T],
1978 trace_container: _TraceableContainer,
1979 reduce_fn: Optional[Callable] = None,
1980 process_chunk: Optional[Callable] = None,
1981 ):
1982 super().__init__(
1983 stream=stream,
1984 trace_container=trace_container,
1985 reduce_fn=reduce_fn,
1986 )
1987 self.__ls_stream__ = stream
1988 self.__ls_gen = _process_async_iterator(
1989 generator=self.__ls_stream__,
1990 run_container=self.__ls_trace_container__,
1991 is_llm_run=self.__is_llm_run__,
1992 accepts_context=aitertools.asyncio_accepts_context(),
1993 results=self.__ls_accumulated_output__,
1994 process_chunk=process_chunk,
1995 )
1997 async def _aend_trace(self, error: Optional[BaseException] = None):
1998 ctx = copy_context()
1999 await asyncio.shield(
2000 aitertools.aio_to_thread(self._end_trace, error, __ctx=ctx)
2001 )
2002 _set_tracing_context(get_tracing_context(ctx))
2004 async def __anext__(self) -> T:
2005 try:
2006 return cast(T, await aitertools.py_anext(self.__ls_gen))
2007 except StopAsyncIteration:
2008 await self._aend_trace()
2009 raise
2011 async def __aiter__(self) -> AsyncIterator[T]:
2012 try:
2013 async for item in self.__ls_gen:
2014 yield item
2015 except BaseException:
2016 await self._aend_trace()
2017 raise
2018 else:
2019 await self._aend_trace()
2021 async def __aenter__(self):
2022 await self.__ls_stream__.__aenter__()
2023 return self
2025 async def __aexit__(self, exc_type, exc_val, exc_tb):
2026 try:
2027 return await self.__ls_stream__.__aexit__(exc_type, exc_val, exc_tb)
2028 finally:
2029 await self._aend_trace()
2032def _get_function_result(results: list, reduce_fn: Callable) -> Any:
2033 if results:
2034 if reduce_fn is not None:
2035 try:
2036 return reduce_fn(results)
2037 except BaseException as e:
2038 LOGGER.error(e)
2039 return results
2040 else:
2041 return results
2044def _cleanup_traceback(e: BaseException):
2045 tb_ = e.__traceback__
2046 if tb_:
2047 while tb_.tb_next is not None and tb_.tb_frame.f_code.co_filename.endswith(
2048 _EXCLUDED_FRAME_FNAME
2049 ):
2050 tb_ = tb_.tb_next
2051 e.__traceback__ = tb_
2054def _is_otel_available() -> bool:
2055 """Cache for otel import check."""
2056 global _OTEL_AVAILABLE
2057 if _OTEL_AVAILABLE is None:
2058 try:
2059 import opentelemetry.trace # noqa: F401
2061 _OTEL_AVAILABLE = True
2062 except ImportError:
2063 _OTEL_AVAILABLE = False
2064 return _OTEL_AVAILABLE
2067def _maybe_create_otel_context(run_tree: Optional[run_trees.RunTree]):
2068 """Create OpenTelemetry context manager if OTEL is enabled and available.
2070 Args:
2071 run_tree: The current run tree.
2073 Returns:
2074 Context manager for use_span or None if not available.
2075 """
2076 if not run_tree or not utils.is_env_var_truish("OTEL_ENABLED"):
2077 return None
2078 if not _is_otel_available():
2079 return None
2081 from opentelemetry.trace import ( # type: ignore[import]
2082 NonRecordingSpan,
2083 SpanContext,
2084 TraceFlags,
2085 TraceState,
2086 use_span,
2087 )
2089 from langsmith._internal._otel_utils import (
2090 get_otel_span_id_from_uuid,
2091 get_otel_trace_id_from_uuid,
2092 )
2094 trace_id = get_otel_trace_id_from_uuid(run_tree.trace_id)
2095 span_id = get_otel_span_id_from_uuid(run_tree.id)
2097 span_context = SpanContext(
2098 trace_id=trace_id,
2099 span_id=span_id,
2100 is_remote=False,
2101 trace_flags=TraceFlags(TraceFlags.SAMPLED),
2102 trace_state=TraceState(),
2103 )
2105 non_recording_span = NonRecordingSpan(span_context)
2106 return use_span(non_recording_span)
2109# For backwards compatibility
2110_PROJECT_NAME = _context._PROJECT_NAME
2111_TAGS = _context._TAGS
2112_METADATA = _context._METADATA
2113_TRACING_ENABLED = _context._TRACING_ENABLED
2114_CLIENT = _context._CLIENT
2115_PARENT_RUN_TREE = _context._PARENT_RUN_TREE