Coverage for langsmith/run_helpers.py: 23%

791 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""Decorator for creating a run tree from functions.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import contextvars 

8import datetime 

9import functools 

10import inspect 

11import logging 

12import warnings 

13from collections.abc import ( 

14 AsyncGenerator, 

15 AsyncIterator, 

16 Awaitable, 

17 Generator, 

18 Iterator, 

19 Mapping, 

20 Sequence, 

21) 

22from contextvars import copy_context 

23from typing import ( 

24 TYPE_CHECKING, 

25 Annotated, 

26 Any, 

27 Callable, 

28 Generic, 

29 Literal, 

30 Optional, 

31 Protocol, 

32 TypedDict, 

33 TypeVar, 

34 Union, 

35 cast, 

36 get_type_hints, 

37 overload, 

38 runtime_checkable, 

39) 

40 

41from typing_extensions import ParamSpec, TypeGuard, get_args, get_origin 

42 

43import langsmith._internal._context as _context 

44from langsmith import client as ls_client 

45from langsmith import run_trees, schemas, utils 

46from langsmith._internal import _aiter as aitertools 

47from langsmith.env import _runtime_env 

48from langsmith.run_trees import WriteReplica 

49 

50if TYPE_CHECKING: 

51 from types import TracebackType 

52 

53 from langchain_core.runnables import Runnable 

54 

55LOGGER = logging.getLogger(__name__) 

56_CONTEXT_KEYS: dict[str, contextvars.ContextVar] = { 

57 "parent": _context._PARENT_RUN_TREE, 

58 "project_name": _context._PROJECT_NAME, 

59 "tags": _context._TAGS, 

60 "metadata": _context._METADATA, 

61 "enabled": _context._TRACING_ENABLED, 

62 "client": _context._CLIENT, 

63 "replicas": run_trees._REPLICAS, 

64 "distributed_parent_id": run_trees._DISTRIBUTED_PARENT_ID, 

65} 

66 

67_EXCLUDED_FRAME_FNAME = "langsmith/run_helpers.py" 

68 

69_OTEL_AVAILABLE: Optional[bool] = None 

70 

71 

72def get_current_run_tree() -> Optional[run_trees.RunTree]: 

73 """Get the current run tree.""" 

74 return _context._PARENT_RUN_TREE.get() 

75 

76 

77def set_run_metadata(**metadata: Any) -> None: 

78 """Update metadata on the current run tree.""" 

79 run_tree = get_current_run_tree() 

80 if run_tree is None: 

81 LOGGER.warning( 

82 "No active run tree found. Call `set_run_metadata` inside a traced run." 

83 ) 

84 else: 

85 run_tree.metadata.update(metadata) 

86 return 

87 

88 

89def get_tracing_context( 

90 context: Optional[contextvars.Context] = None, 

91) -> dict[str, Any]: 

92 """Get the current tracing context.""" 

93 if context is None: 

94 return { 

95 "parent": _context._PARENT_RUN_TREE.get(), 

96 "project_name": _context._PROJECT_NAME.get(), 

97 "tags": _context._TAGS.get(), 

98 "metadata": _context._METADATA.get(), 

99 "enabled": _context._TRACING_ENABLED.get(), 

100 "client": _context._CLIENT.get(), 

101 "replicas": run_trees._REPLICAS.get(), 

102 "distributed_parent_id": run_trees._DISTRIBUTED_PARENT_ID.get(), 

103 } 

104 return {k: context.get(v) for k, v in _CONTEXT_KEYS.items()} 

105 

106 

107@contextlib.contextmanager 

108def tracing_context( 

109 *, 

110 project_name: Optional[str] = None, 

111 tags: Optional[list[str]] = None, 

112 metadata: Optional[dict[str, Any]] = None, 

113 parent: Optional[Union[run_trees.RunTree, Mapping, str, Literal[False]]] = None, 

114 enabled: Optional[Union[bool, Literal["local"]]] = None, 

115 client: Optional[ls_client.Client] = None, 

116 replicas: Optional[Sequence[WriteReplica]] = None, 

117 distributed_parent_id: Optional[str] = None, 

118 **kwargs: Any, 

119) -> Generator[None, None, None]: 

120 """Set the tracing context for a block of code. 

121 

122 Args: 

123 project_name: The name of the project to log the run to. 

124 tags: The tags to add to the run. 

125 metadata: The metadata to add to the run. 

126 parent: The parent run to use for the context. 

127 

128 Can be a Run/`RunTree` object, request headers (for distributed tracing), 

129 or the dotted order string. 

130 client: The client to use for logging the run to LangSmith. 

131 enabled: Whether tracing is enabled. 

132 

133 Defaults to `None`, meaning it will use the current context value or environment variables. 

134 replicas: A sequence of `WriteReplica` dictionaries to send runs to. 

135 

136 Example: `[{"api_url": "https://api.example.com", "api_key": "key", "project_name": "proj"}]` 

137 or `[{"project_name": "my_experiment", "updates": {"reference_example_id": None}}]` 

138 distributed_parent_id: The distributed parent ID for distributed tracing. Defaults to None. 

139 """ 

140 if kwargs: 

141 # warn 

142 warnings.warn( 

143 f"Unrecognized keyword arguments: {kwargs}.", 

144 DeprecationWarning, 

145 ) 

146 current_context = get_tracing_context() 

147 parent_run = ( 

148 _get_parent_run({"parent": parent or kwargs.get("parent_run")}) 

149 if parent is not False 

150 else None 

151 ) 

152 distributed_parent_id_to_use = distributed_parent_id 

153 if distributed_parent_id_to_use is None and parent_run is not None: 

154 # TODO(angus): decide if we want to merge tags and metadata 

155 tags = sorted(set(tags or []) | set(parent_run.tags or [])) 

156 metadata = {**parent_run.metadata, **(metadata or {})} 

157 distributed_parent_id_to_use = parent_run.id # type: ignore[assignment] 

158 enabled = enabled if enabled is not None else current_context.get("enabled") 

159 _set_tracing_context( 

160 { 

161 "parent": parent_run, 

162 "project_name": project_name, 

163 "tags": tags, 

164 "metadata": metadata, 

165 "enabled": enabled, 

166 "client": client, 

167 "replicas": replicas, 

168 "distributed_parent_id": distributed_parent_id_to_use, 

169 } 

170 ) 

171 try: 

172 yield 

173 finally: 

174 _set_tracing_context(current_context) 

175 

176 

177# Alias for backwards compatibility 

178get_run_tree_context = get_current_run_tree 

179 

180 

181def is_traceable_function(func: Any) -> TypeGuard[SupportsLangsmithExtra[P, R]]: 

182 """Check if a function is `@traceable` decorated.""" 

183 return ( 

184 _is_traceable_function(func) 

185 or (isinstance(func, functools.partial) and _is_traceable_function(func.func)) 

186 or (hasattr(func, "__call__") and _is_traceable_function(func.__call__)) 

187 ) 

188 

189 

190def ensure_traceable( 

191 func: Callable[P, R], 

192 *, 

193 name: Optional[str] = None, 

194 metadata: Optional[Mapping[str, Any]] = None, 

195 tags: Optional[list[str]] = None, 

196 client: Optional[ls_client.Client] = None, 

197 reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None, 

198 project_name: Optional[str] = None, 

199 process_inputs: Optional[Callable[[dict], dict]] = None, 

200 process_outputs: Optional[Callable[..., dict]] = None, 

201 process_chunk: Optional[Callable] = None, 

202) -> SupportsLangsmithExtra[P, R]: 

203 """Ensure that a function is traceable.""" 

204 if is_traceable_function(func): 

205 return func 

206 return traceable( 

207 name=name, 

208 metadata=metadata, 

209 tags=tags, 

210 client=client, 

211 reduce_fn=reduce_fn, 

212 project_name=project_name, 

213 process_inputs=process_inputs, 

214 process_outputs=process_outputs, 

215 process_chunk=process_chunk, 

216 )(func) 

217 

218 

219def is_async(func: Callable) -> bool: 

220 """Inspect function or wrapped function to see if it is async.""" 

221 return inspect.iscoroutinefunction(func) or ( 

222 hasattr(func, "__wrapped__") and inspect.iscoroutinefunction(func.__wrapped__) 

223 ) 

224 

225 

226class LangSmithExtra(TypedDict, total=False): 

227 """Any additional info to be injected into the run dynamically.""" 

228 

229 name: Optional[str] 

230 """Optional name for the run.""" 

231 reference_example_id: Optional[ls_client.ID_TYPE] 

232 """Optional ID of a reference example.""" 

233 run_extra: Optional[dict] 

234 """Optional additional run information.""" 

235 parent: Optional[Union[run_trees.RunTree, str, Mapping]] 

236 """Optional parent run, can be a RunTree, string, or mapping.""" 

237 run_tree: Optional[run_trees.RunTree] # TODO: Deprecate 

238 """Optional run tree (deprecated).""" 

239 project_name: Optional[str] 

240 """Optional name of the project.""" 

241 metadata: Optional[dict[str, Any]] 

242 """Optional metadata for the run.""" 

243 tags: Optional[list[str]] 

244 """Optional list of tags for the run.""" 

245 run_id: Optional[ls_client.ID_TYPE] 

246 """Optional ID for the run.""" 

247 client: Optional[ls_client.Client] 

248 """Optional LangSmith client.""" 

249 # Optional callback function to be called if the run succeeds and before it is sent. 

250 _on_success: Optional[Callable[[run_trees.RunTree], None]] 

251 on_end: Optional[Callable[[run_trees.RunTree], Any]] 

252 """Optional callback function to be called after the run ends and is sent.""" 

253 

254 

255R = TypeVar("R", covariant=True) 

256P = ParamSpec("P") 

257 

258 

259@runtime_checkable 

260class SupportsLangsmithExtra(Protocol, Generic[P, R]): 

261 """Implementations of this Protocol accept an optional langsmith_extra parameter.""" 

262 

263 def __call__( # type: ignore[valid-type] 

264 self, 

265 *args: P.args, 

266 langsmith_extra: Optional[LangSmithExtra] = None, 

267 **kwargs: P.kwargs, 

268 ) -> R: 

269 """Call the instance when it is called as a function. 

270 

271 Args: 

272 *args: Variable length argument list. 

273 langsmith_extra: Optional dictionary containing additional 

274 parameters specific to Langsmith. 

275 **kwargs: Arbitrary keyword arguments. 

276 

277 Returns: 

278 R: The return value of the method. 

279 

280 """ 

281 ... 

282 

283 

284def _extract_usage( 

285 *, 

286 run_tree: run_trees.RunTree, 

287 outputs: Optional[dict] = None, 

288 **kwargs: Any, 

289) -> Optional[schemas.ExtractedUsageMetadata]: 

290 from_metadata = (run_tree.metadata or {}).get("usage_metadata") 

291 return (outputs or {}).get("usage_metadata") or from_metadata 

292 

293 

294@overload 

295def traceable( 

296 func: Callable[P, R], 

297) -> SupportsLangsmithExtra[P, R]: ... 

298 

299 

300@overload 

301def traceable( 

302 run_type: ls_client.RUN_TYPE_T = "chain", 

303 *, 

304 name: Optional[str] = None, 

305 metadata: Optional[Mapping[str, Any]] = None, 

306 tags: Optional[list[str]] = None, 

307 client: Optional[ls_client.Client] = None, 

308 reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None, 

309 project_name: Optional[str] = None, 

310 process_inputs: Optional[Callable[[dict], dict]] = None, 

311 process_outputs: Optional[Callable[..., dict]] = None, 

312 process_chunk: Optional[Callable] = None, 

313 _invocation_params_fn: Optional[Callable[[dict], dict]] = None, 

314 dangerously_allow_filesystem: bool = False, 

315) -> Callable[[Callable[P, R]], SupportsLangsmithExtra[P, R]]: ... 

316 

317 

318def traceable( 

319 *args: Any, 

320 **kwargs: Any, 

321) -> Union[Callable, Callable[[Callable], Callable]]: 

322 """Trace a function with langsmith. 

323 

324 Args: 

325 run_type: The type of run (span) to create. 

326 

327 Examples: `llm`, `chain`, `tool`, `prompt`, `retriever`, etc. 

328 

329 Defaults to "chain". 

330 name: The name of the run. Defaults to the function name. 

331 metadata: The metadata to add to the run. Defaults to `None`. 

332 tags: The tags to add to the run. Defaults to `None`. 

333 client: The client to use for logging the run to LangSmith. Defaults to 

334 `None`, which will use the default client. 

335 reduce_fn: A function to reduce the output of the function if the function 

336 returns a generator. 

337 

338 Defaults to `None`, which means the values will be logged as a list. 

339 

340 !!! note 

341 

342 If the iterator is never exhausted (e.g. the function returns an 

343 infinite generator), this will never be called, and the run itself will 

344 be stuck in a pending state. 

345 project_name: The name of the project to log the run to. 

346 

347 Defaults to `None`, which will use the default project. 

348 process_inputs: Custom serialization / processing function for inputs. 

349 

350 Defaults to `None`. 

351 process_outputs: Custom serialization / processing function for outputs. 

352 

353 Defaults to `None`. 

354 dangerously_allow_filesystem: Whether to allow filesystem access for attachments. 

355 

356 Defaults to `False`. 

357 

358 Traces that reference local filepaths will be uploaded to LangSmith. 

359 In general, network-hosted applications should not be using this because 

360 referenced files are usually on the user's machine, not the host machine. 

361 

362 Returns: 

363 The decorated function. 

364 

365 !!! note 

366 

367 Requires that `LANGSMITH_TRACING_V2` be set to 'true' in the environment. 

368 

369 Examples: 

370 !!! example "Basic usage" 

371 

372 ```python 

373 @traceable 

374 def my_function(x: float, y: float) -> float: 

375 return x + y 

376 

377 

378 my_function(5, 6) 

379 

380 

381 @traceable 

382 async def my_async_function(query_params: dict) -> dict: 

383 async with httpx.AsyncClient() as http_client: 

384 response = await http_client.get( 

385 "https://api.example.com/data", 

386 params=query_params, 

387 ) 

388 return response.json() 

389 

390 

391 asyncio.run(my_async_function({"param": "value"})) 

392 ``` 

393 

394 !!! example "Streaming data with a generator" 

395 

396 ```python 

397 @traceable 

398 def my_generator(n: int) -> Iterable: 

399 for i in range(n): 

400 yield i 

401 

402 

403 for item in my_generator(5): 

404 print(item) 

405 ``` 

406 

407 !!! example "Async streaming data" 

408 

409 ```python 

410 @traceable 

411 async def my_async_generator(query_params: dict) -> Iterable: 

412 async with httpx.AsyncClient() as http_client: 

413 response = await http_client.get( 

414 "https://api.example.com/data", 

415 params=query_params, 

416 ) 

417 for item in response.json(): 

418 yield item 

419 

420 

421 async def async_code(): 

422 async for item in my_async_generator({"param": "value"}): 

423 print(item) 

424 

425 

426 asyncio.run(async_code()) 

427 ``` 

428 

429 !!! example "Specifying a run type and name" 

430 

431 ```python 

432 @traceable(name="CustomName", run_type="tool") 

433 def another_function(a: float, b: float) -> float: 

434 return a * b 

435 

436 

437 another_function(5, 6) 

438 ``` 

439 

440 !!! example "Logging with custom metadata and tags" 

441 

442 ```python 

443 @traceable( 

444 metadata={"version": "1.0", "author": "John Doe"}, tags=["beta", "test"] 

445 ) 

446 def tagged_function(x): 

447 return x**2 

448 

449 

450 tagged_function(5) 

451 ``` 

452 

453 !!! example "Specifying a custom client and project name" 

454 

455 ```python 

456 custom_client = Client(api_key="your_api_key") 

457 

458 

459 @traceable(client=custom_client, project_name="My Special Project") 

460 def project_specific_function(data): 

461 return data 

462 

463 

464 project_specific_function({"data": "to process"}) 

465 ``` 

466 

467 !!! example "Manually passing `langsmith_extra`" 

468 

469 ```python 

470 @traceable 

471 def manual_extra_function(x): 

472 return x**2 

473 

474 

475 manual_extra_function(5, langsmith_extra={"metadata": {"version": "1.0"}}) 

476 ``` 

477 """ 

478 run_type = cast( 

479 ls_client.RUN_TYPE_T, 

480 ( 

481 args[0] 

482 if args and isinstance(args[0], str) 

483 else (kwargs.pop("run_type", None) or "chain") 

484 ), 

485 ) 

486 if run_type not in _VALID_RUN_TYPES: 

487 warnings.warn( 

488 f"Unrecognized run_type: {run_type}. Must be one of: {_VALID_RUN_TYPES}." 

489 f" Did you mean @traceable(name='{run_type}')?" 

490 ) 

491 if len(args) > 1: 

492 warnings.warn( 

493 "The `traceable()` decorator only accepts one positional argument, " 

494 "which should be the run_type. All other arguments should be passed " 

495 "as keyword arguments." 

496 ) 

497 if "extra" in kwargs: 

498 warnings.warn( 

499 "The `extra` keyword argument is deprecated. Please use `metadata` " 

500 "instead.", 

501 DeprecationWarning, 

502 ) 

503 reduce_fn = kwargs.pop("reduce_fn", None) 

504 container_input = _ContainerInput( 

505 # TODO: Deprecate raw extra 

506 extra_outer=kwargs.pop("extra", None), 

507 name=kwargs.pop("name", None), 

508 metadata=kwargs.pop("metadata", None), 

509 tags=kwargs.pop("tags", None), 

510 client=kwargs.pop("client", None), 

511 project_name=kwargs.pop("project_name", None), 

512 run_type=run_type, 

513 process_inputs=kwargs.pop("process_inputs", None), 

514 process_chunk=kwargs.pop("process_chunk", None), 

515 invocation_params_fn=kwargs.pop("_invocation_params_fn", None), 

516 dangerously_allow_filesystem=kwargs.pop("dangerously_allow_filesystem", False), 

517 ) 

518 outputs_processor = kwargs.pop("process_outputs", None) 

519 _on_run_end = functools.partial( 

520 _handle_container_end, 

521 outputs_processor=outputs_processor, 

522 ) 

523 

524 if kwargs: 

525 warnings.warn( 

526 f"The following keyword arguments are not recognized and will be ignored: " 

527 f"{sorted(kwargs.keys())}.", 

528 DeprecationWarning, 

529 ) 

530 

531 def decorator(func: Callable): 

532 func_sig = inspect.signature(func) 

533 func_accepts_parent_run = func_sig.parameters.get("run_tree", None) is not None 

534 func_accepts_config = func_sig.parameters.get("config", None) is not None 

535 

536 @functools.wraps(func) 

537 async def async_wrapper( 

538 *args: Any, 

539 langsmith_extra: Optional[LangSmithExtra] = None, 

540 **kwargs: Any, 

541 ) -> Any: 

542 """Async version of wrapper function.""" 

543 if not func_accepts_config: 

544 kwargs.pop("config", None) 

545 run_container = await aitertools.aio_to_thread( 

546 _setup_run, 

547 func, 

548 container_input=container_input, 

549 langsmith_extra=langsmith_extra, 

550 args=args, 

551 kwargs=kwargs, 

552 ) 

553 

554 try: 

555 accepts_context = aitertools.asyncio_accepts_context() 

556 if func_accepts_parent_run: 

557 kwargs["run_tree"] = run_container["new_run"] 

558 

559 otel_context_manager = _maybe_create_otel_context( 

560 run_container["new_run"] 

561 ) 

562 if otel_context_manager: 

563 

564 async def run_with_otel_context(): 

565 with otel_context_manager: 

566 return await func(*args, **kwargs) 

567 

568 if accepts_context: 

569 function_result = await asyncio.create_task( # type: ignore[call-arg] 

570 run_with_otel_context(), context=run_container["context"] 

571 ) 

572 else: 

573 # Python < 3.11 

574 with tracing_context( 

575 **get_tracing_context(run_container["context"]) 

576 ): 

577 function_result = await run_with_otel_context() 

578 else: 

579 fr_coro = func(*args, **kwargs) 

580 if accepts_context: 

581 function_result = await asyncio.create_task( # type: ignore[call-arg] 

582 fr_coro, context=run_container["context"] 

583 ) 

584 else: 

585 # Python < 3.11 

586 with tracing_context( 

587 **get_tracing_context(run_container["context"]) 

588 ): 

589 function_result = await fr_coro 

590 except BaseException as e: 

591 # shield from cancellation, given we're catching all exceptions 

592 _cleanup_traceback(e) 

593 await asyncio.shield( 

594 aitertools.aio_to_thread(_on_run_end, run_container, error=e) 

595 ) 

596 raise 

597 await aitertools.aio_to_thread( 

598 _on_run_end, run_container, outputs=function_result 

599 ) 

600 return function_result 

601 

602 @functools.wraps(func) 

603 async def async_generator_wrapper( 

604 *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any 

605 ) -> AsyncGenerator: 

606 if not func_accepts_config: 

607 kwargs.pop("config", None) 

608 run_container = await aitertools.aio_to_thread( 

609 _setup_run, 

610 func, 

611 container_input=container_input, 

612 langsmith_extra=langsmith_extra, 

613 args=args, 

614 kwargs=kwargs, 

615 ) 

616 results: list[Any] = [] 

617 try: 

618 if func_accepts_parent_run: 

619 kwargs["run_tree"] = run_container["new_run"] 

620 # TODO: Nesting is ambiguous if a nested traceable function is only 

621 # called mid-generation. Need to explicitly accept run_tree to get 

622 # around this. 

623 

624 otel_context_manager = _maybe_create_otel_context( 

625 run_container["new_run"] 

626 ) 

627 

628 async_gen_result = func(*args, **kwargs) 

629 # Can't iterate through if it's a coroutine 

630 accepts_context = aitertools.asyncio_accepts_context() 

631 if inspect.iscoroutine(async_gen_result): 

632 if accepts_context: 

633 async_gen_result = await asyncio.create_task( 

634 async_gen_result, context=run_container["context"] 

635 ) # type: ignore 

636 else: 

637 # Python < 3.11 

638 with tracing_context( 

639 **get_tracing_context(run_container["context"]) 

640 ): 

641 async_gen_result = await async_gen_result 

642 

643 async for item in _process_async_iterator( 

644 generator=async_gen_result, 

645 run_container=run_container, 

646 is_llm_run=( 

647 run_container["new_run"].run_type == "llm" 

648 if run_container["new_run"] 

649 else False 

650 ), 

651 accepts_context=accepts_context, 

652 results=results, 

653 process_chunk=container_input.get("process_chunk"), 

654 otel_context_manager=otel_context_manager, 

655 ): 

656 yield item 

657 except BaseException as e: 

658 _cleanup_traceback(e) 

659 await asyncio.shield( 

660 aitertools.aio_to_thread( 

661 _on_run_end, 

662 run_container, 

663 error=e, 

664 outputs=_get_function_result(results, reduce_fn), 

665 ) 

666 ) 

667 raise 

668 await aitertools.aio_to_thread( 

669 _on_run_end, 

670 run_container, 

671 outputs=_get_function_result(results, reduce_fn), 

672 ) 

673 

674 @functools.wraps(func) 

675 def wrapper( 

676 *args: Any, 

677 langsmith_extra: Optional[LangSmithExtra] = None, 

678 **kwargs: Any, 

679 ) -> Any: 

680 """Create a new run or create_child() if run is passed in kwargs.""" 

681 if not func_accepts_config: 

682 kwargs.pop("config", None) 

683 run_container = _setup_run( 

684 func, 

685 container_input=container_input, 

686 langsmith_extra=langsmith_extra, 

687 args=args, 

688 kwargs=kwargs, 

689 ) 

690 func_accepts_parent_run = ( 

691 inspect.signature(func).parameters.get("run_tree", None) is not None 

692 ) 

693 try: 

694 if func_accepts_parent_run: 

695 kwargs["run_tree"] = run_container["new_run"] 

696 

697 otel_context_manager = _maybe_create_otel_context( 

698 run_container["new_run"] 

699 ) 

700 if otel_context_manager: 

701 

702 def run_with_otel_context(): 

703 with otel_context_manager: 

704 return func(*args, **kwargs) 

705 

706 function_result = run_container["context"].run( 

707 run_with_otel_context 

708 ) 

709 else: 

710 function_result = run_container["context"].run( 

711 func, *args, **kwargs 

712 ) 

713 except BaseException as e: 

714 _cleanup_traceback(e) 

715 _on_run_end(run_container, error=e) 

716 raise 

717 _on_run_end(run_container, outputs=function_result) 

718 return function_result 

719 

720 @functools.wraps(func) 

721 def generator_wrapper( 

722 *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any 

723 ) -> Any: 

724 if not func_accepts_config: 

725 kwargs.pop("config", None) 

726 run_container = _setup_run( 

727 func, 

728 container_input=container_input, 

729 langsmith_extra=langsmith_extra, 

730 args=args, 

731 kwargs=kwargs, 

732 ) 

733 func_accepts_parent_run = ( 

734 inspect.signature(func).parameters.get("run_tree", None) is not None 

735 ) 

736 results: list[Any] = [] 

737 function_return: Any = None 

738 

739 try: 

740 if func_accepts_parent_run: 

741 kwargs["run_tree"] = run_container["new_run"] 

742 

743 generator_result = run_container["context"].run(func, *args, **kwargs) 

744 

745 otel_context_manager = _maybe_create_otel_context( 

746 run_container["new_run"] 

747 ) 

748 

749 function_return = yield from _process_iterator( 

750 generator_result, 

751 run_container, 

752 is_llm_run=run_type == "llm", 

753 results=results, 

754 process_chunk=container_input.get("process_chunk"), 

755 otel_context_manager=otel_context_manager, 

756 ) 

757 

758 if function_return is not None: 

759 results.append(function_return) 

760 

761 except BaseException as e: 

762 _cleanup_traceback(e) 

763 _on_run_end( 

764 run_container, 

765 error=e, 

766 outputs=_get_function_result(results, reduce_fn), 

767 ) 

768 raise 

769 _on_run_end(run_container, outputs=_get_function_result(results, reduce_fn)) 

770 

771 return function_return 

772 

773 # "Stream" functions (used in methods like OpenAI/Anthropic's SDKs) 

774 # are functions that return iterable responses and should not be 

775 # considered complete until the streaming is completed 

776 @functools.wraps(func) 

777 def stream_wrapper( 

778 *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any 

779 ) -> Any: 

780 if not func_accepts_config: 

781 kwargs.pop("config", None) 

782 trace_container = _setup_run( 

783 func, 

784 container_input=container_input, 

785 langsmith_extra=langsmith_extra, 

786 args=args, 

787 kwargs=kwargs, 

788 ) 

789 

790 try: 

791 if func_accepts_parent_run: 

792 kwargs["run_tree"] = trace_container["new_run"] 

793 stream = trace_container["context"].run(func, *args, **kwargs) 

794 except Exception as e: 

795 _cleanup_traceback(e) 

796 _on_run_end(trace_container, error=e) 

797 raise 

798 

799 if hasattr(stream, "__iter__"): 

800 return _TracedStream(stream, trace_container, reduce_fn) 

801 elif hasattr(stream, "__aiter__"): 

802 # sync function -> async iterable (unexpected) 

803 return _TracedAsyncStream(stream, trace_container, reduce_fn) 

804 

805 # If it's not iterable, end the trace immediately 

806 _on_run_end(trace_container, outputs=stream) 

807 return stream 

808 

809 @functools.wraps(func) 

810 async def async_stream_wrapper( 

811 *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any 

812 ) -> Any: 

813 if not func_accepts_config: 

814 kwargs.pop("config", None) 

815 trace_container = await aitertools.aio_to_thread( 

816 _setup_run, 

817 func, 

818 container_input=container_input, 

819 langsmith_extra=langsmith_extra, 

820 args=args, 

821 kwargs=kwargs, 

822 ) 

823 

824 try: 

825 if func_accepts_parent_run: 

826 kwargs["run_tree"] = trace_container["new_run"] 

827 stream = await func(*args, **kwargs) 

828 except Exception as e: 

829 await aitertools.aio_to_thread(_on_run_end, trace_container, error=e) 

830 raise 

831 

832 if hasattr(stream, "__aiter__"): 

833 return _TracedAsyncStream(stream, trace_container, reduce_fn) 

834 elif hasattr(stream, "__iter__"): 

835 # Async function -> sync iterable 

836 return _TracedStream(stream, trace_container, reduce_fn) 

837 

838 # If it's not iterable, end the trace immediately 

839 await aitertools.aio_to_thread(_on_run_end, trace_container, outputs=stream) 

840 return stream 

841 

842 if inspect.isasyncgenfunction(func): 

843 selected_wrapper: Callable = async_generator_wrapper 

844 elif inspect.isgeneratorfunction(func): 

845 selected_wrapper = generator_wrapper 

846 elif is_async(func): 

847 if reduce_fn: 

848 selected_wrapper = async_stream_wrapper 

849 else: 

850 selected_wrapper = async_wrapper 

851 else: 

852 if reduce_fn: 

853 selected_wrapper = stream_wrapper 

854 else: 

855 selected_wrapper = wrapper 

856 setattr(selected_wrapper, "__langsmith_traceable__", True) 

857 sig = inspect.signature(selected_wrapper) 

858 if not sig.parameters.get("config"): 

859 sig = sig.replace( 

860 parameters=[ 

861 *( 

862 param 

863 for param in sig.parameters.values() 

864 if param.kind != inspect.Parameter.VAR_KEYWORD 

865 ), 

866 inspect.Parameter( 

867 "config", inspect.Parameter.KEYWORD_ONLY, default=None 

868 ), 

869 *( 

870 param 

871 for param in sig.parameters.values() 

872 if param.kind == inspect.Parameter.VAR_KEYWORD 

873 ), 

874 ] 

875 ) 

876 selected_wrapper.__signature__ = sig # type: ignore[attr-defined] 

877 return selected_wrapper 

878 

879 # If the decorator is called with no arguments, then it's being used as a 

880 # decorator, so we return the decorator function 

881 if len(args) == 1 and callable(args[0]) and not kwargs: 

882 return decorator(args[0]) 

883 # Else it's being used as a decorator factory, so we return the decorator 

884 return decorator 

885 

886 

887class trace: 

888 """Manage a LangSmith run in context. 

889 

890 This class can be used as both a synchronous and asynchronous context manager. 

891 

892 Args: 

893 name: Name of the run. 

894 run_type: Type of run (e.g., `'chain'`, `'llm'`, `'tool'`). 

895 inputs: Initial input data for the run. 

896 project_name: Project name to associate the run with. 

897 parent: Parent run. 

898 

899 Can be a `RunTree`, dotted order string, or tracing headers. 

900 tags: List of tags for the run. 

901 metadata: Additional metadata for the run. 

902 client: LangSmith client for custom settings. 

903 run_id: Preset identifier for the run. 

904 reference_example_id: Associates run with a dataset example. 

905 

906 Only for root runs in evaluation. 

907 exceptions_to_handle: Exception types to ignore. 

908 extra: Extra data to send to LangSmith. 

909 

910 Use 'metadata' instead. 

911 

912 Examples: 

913 Synchronous usage: 

914 

915 ```python 

916 with trace("My Operation", run_type="tool", tags=["important"]) as run: 

917 result = "foo" # Perform operation 

918 run.metadata["some-key"] = "some-value" 

919 run.end(outputs={"result": result}) 

920 ``` 

921 

922 Asynchronous usage: 

923 

924 ```python 

925 async def main(): 

926 async with trace("Async Operation", run_type="tool", tags=["async"]) as run: 

927 result = "foo" # Await async operation 

928 run.metadata["some-key"] = "some-value" 

929 # "end" just adds the outputs and sets error to None 

930 # The actual patching of the run happens when the context exits 

931 run.end(outputs={"result": result}) 

932 

933 

934 asyncio.run(main()) 

935 ``` 

936 

937 Handling specific exceptions: 

938 

939 ```python 

940 import pytest 

941 import sys 

942 

943 with trace("Test", exceptions_to_handle=(pytest.skip.Exception,)): 

944 if sys.platform == "win32": # Just an example 

945 pytest.skip("Skipping test for windows") 

946 result = "foo" # Perform test operation 

947 ``` 

948 """ 

949 

950 def __init__( 

951 self, 

952 name: str, 

953 run_type: ls_client.RUN_TYPE_T = "chain", 

954 *, 

955 inputs: Optional[dict] = None, 

956 extra: Optional[dict] = None, 

957 project_name: Optional[str] = None, 

958 parent: Optional[ 

959 Union[run_trees.RunTree, str, Mapping, Literal["ignore"]] 

960 ] = None, 

961 tags: Optional[list[str]] = None, 

962 metadata: Optional[Mapping[str, Any]] = None, 

963 client: Optional[ls_client.Client] = None, 

964 run_id: Optional[ls_client.ID_TYPE] = None, 

965 reference_example_id: Optional[ls_client.ID_TYPE] = None, 

966 exceptions_to_handle: Optional[tuple[type[BaseException], ...]] = None, 

967 attachments: Optional[schemas.Attachments] = None, 

968 **kwargs: Any, 

969 ): 

970 """Initialize the trace context manager. 

971 

972 Warns if unsupported kwargs are passed. 

973 """ 

974 self._end_on_exit = kwargs.pop("_end_on_exit", True) 

975 if kwargs: 

976 warnings.warn( 

977 "The `trace` context manager no longer supports the following kwargs: " 

978 f"{sorted(kwargs.keys())}.", 

979 DeprecationWarning, 

980 ) 

981 self.name = name 

982 self.run_type = run_type 

983 self.inputs = inputs 

984 self.attachments = attachments 

985 self.extra = extra 

986 self.project_name = project_name 

987 self.parent = parent 

988 # The run tree is deprecated. Keeping for backwards compat. 

989 # Will fully merge within parent later. 

990 self.run_tree = kwargs.get("run_tree") 

991 self.tags = tags 

992 self.metadata = metadata 

993 self.client = client 

994 self.run_id = run_id 

995 self.reference_example_id = reference_example_id 

996 self.exceptions_to_handle = exceptions_to_handle 

997 self.new_run: Optional[run_trees.RunTree] = None 

998 self.old_ctx: Optional[dict] = None 

999 

1000 def _setup(self) -> run_trees.RunTree: 

1001 """Set up the tracing context and create a new run. 

1002 

1003 This method initializes the tracing context, merges tags and metadata, 

1004 creates a new run (either as a child of an existing run or as a new root run), 

1005 and sets up the necessary context variables. 

1006 

1007 Returns: 

1008 run_trees.RunTree: The newly created run. 

1009 """ 

1010 self.old_ctx = get_tracing_context() 

1011 enabled = utils.tracing_is_enabled(self.old_ctx) 

1012 

1013 outer_tags = _context._TAGS.get() or _context._GLOBAL_TAGS 

1014 outer_metadata = _context._METADATA.get() or _context._GLOBAL_METADATA 

1015 client_ = self.client or self.old_ctx.get("client") 

1016 parent_run_ = _get_parent_run( 

1017 { 

1018 "parent": self.parent, 

1019 "run_tree": self.run_tree, 

1020 "client": client_, 

1021 "project_name": self.project_name, 

1022 } 

1023 ) 

1024 

1025 tags_ = sorted(set((self.tags or []) + (outer_tags or []))) 

1026 metadata = { 

1027 **(self.metadata or {}), 

1028 **(outer_metadata or {}), 

1029 "ls_method": "trace", 

1030 } 

1031 

1032 extra_outer = self.extra or {} 

1033 extra_outer["metadata"] = metadata 

1034 

1035 project_name_ = _get_project_name(self.project_name) 

1036 

1037 if parent_run_ is not None and enabled: 

1038 self.new_run = parent_run_.create_child( 

1039 name=self.name, 

1040 run_id=self.run_id, 

1041 run_type=self.run_type, 

1042 extra=extra_outer, 

1043 inputs=self.inputs, 

1044 tags=tags_, 

1045 attachments=self.attachments, 

1046 ) 

1047 else: 

1048 self.new_run = run_trees.RunTree( 

1049 name=self.name, 

1050 id=ls_client._ensure_uuid(self.run_id), 

1051 reference_example_id=ls_client._ensure_uuid( 

1052 self.reference_example_id, accept_null=True 

1053 ), 

1054 run_type=self.run_type, 

1055 extra=extra_outer, 

1056 project_name=project_name_ or "default", 

1057 replicas=run_trees._REPLICAS.get(), 

1058 inputs=self.inputs or {}, 

1059 tags=tags_, 

1060 client=client_, # type: ignore 

1061 attachments=self.attachments or {}, # type: ignore 

1062 ) 

1063 

1064 if enabled is True: 

1065 self.new_run.post() 

1066 if enabled: 

1067 _context._TAGS.set(tags_) 

1068 _context._METADATA.set(metadata) 

1069 _context._PARENT_RUN_TREE.set(self.new_run) 

1070 _context._PROJECT_NAME.set(project_name_) 

1071 _context._CLIENT.set(client_) 

1072 

1073 return self.new_run 

1074 

1075 def _teardown( 

1076 self, 

1077 exc_type: Optional[type[BaseException]], 

1078 exc_value: Optional[BaseException], 

1079 traceback: Optional[TracebackType], 

1080 ) -> None: 

1081 """Clean up the tracing context and finalize the run. 

1082 

1083 This method handles exceptions, ends the run if necessary, 

1084 patches the run if it's not disabled, and resets the tracing context. 

1085 

1086 Args: 

1087 exc_type: The type of the exception that occurred, if any. 

1088 exc_value: The exception instance that occurred, if any. 

1089 traceback: The traceback object associated with the exception, if any. 

1090 """ 

1091 if self.new_run is None: 

1092 return 

1093 if exc_type is not None: 

1094 if self.exceptions_to_handle and issubclass( 

1095 exc_type, self.exceptions_to_handle 

1096 ): 

1097 tb = None 

1098 else: 

1099 tb = utils._format_exc() 

1100 tb = f"{exc_type.__name__}: {exc_value}\n\n{tb}" 

1101 self.new_run.end(error=tb) 

1102 if self.old_ctx is not None: 

1103 enabled = utils.tracing_is_enabled(self.old_ctx) 

1104 if enabled is True and self._end_on_exit: 

1105 self.new_run.patch() 

1106 

1107 _set_tracing_context(self.old_ctx) 

1108 else: 

1109 warnings.warn("Tracing context was not set up properly.", RuntimeWarning) 

1110 

1111 def __enter__(self) -> run_trees.RunTree: 

1112 """Enter the context manager synchronously. 

1113 

1114 Returns: 

1115 run_trees.RunTree: The newly created run. 

1116 """ 

1117 return self._setup() 

1118 

1119 def __exit__( 

1120 self, 

1121 exc_type: Optional[type[BaseException]] = None, 

1122 exc_value: Optional[BaseException] = None, 

1123 traceback: Optional[TracebackType] = None, 

1124 ) -> None: 

1125 """Exit the context manager synchronously. 

1126 

1127 Args: 

1128 exc_type: The type of the exception that occurred, if any. 

1129 exc_value: The exception instance that occurred, if any. 

1130 traceback: The traceback object associated with the exception, if any. 

1131 """ 

1132 self._teardown(exc_type, exc_value, traceback) 

1133 

1134 async def __aenter__(self) -> run_trees.RunTree: 

1135 """Enter the context manager asynchronously. 

1136 

1137 Returns: 

1138 run_trees.RunTree: The newly created run. 

1139 """ 

1140 ctx = copy_context() 

1141 result = await aitertools.aio_to_thread(self._setup, __ctx=ctx) 

1142 # Set the context for the current thread 

1143 _set_tracing_context(get_tracing_context(ctx)) 

1144 return result 

1145 

1146 async def __aexit__( 

1147 self, 

1148 exc_type: Optional[type[BaseException]] = None, 

1149 exc_value: Optional[BaseException] = None, 

1150 traceback: Optional[TracebackType] = None, 

1151 ) -> None: 

1152 """Exit the context manager asynchronously. 

1153 

1154 Args: 

1155 exc_type: The type of the exception that occurred, if any. 

1156 exc_value: The exception instance that occurred, if any. 

1157 traceback: The traceback object associated with the exception, if any. 

1158 """ 

1159 ctx = copy_context() 

1160 if exc_type is not None: 

1161 await asyncio.shield( 

1162 aitertools.aio_to_thread( 

1163 self._teardown, exc_type, exc_value, traceback, __ctx=ctx 

1164 ) 

1165 ) 

1166 else: 

1167 await aitertools.aio_to_thread( 

1168 self._teardown, exc_type, exc_value, traceback, __ctx=ctx 

1169 ) 

1170 _set_tracing_context(get_tracing_context(ctx)) 

1171 

1172 

1173def _get_project_name(project_name: Optional[str]) -> Optional[str]: 

1174 if project_name: 

1175 return project_name 

1176 prt = _PARENT_RUN_TREE.get() 

1177 return ( 

1178 # Maintain tree consistency first 

1179 _context._PROJECT_NAME.get() 

1180 or (prt.session_name if prt else None) 

1181 # Global fallback configured via ls.configure(...) 

1182 or _context._GLOBAL_PROJECT_NAME 

1183 # fallback to the default for the environment 

1184 or utils.get_tracer_project() 

1185 ) 

1186 

1187 

1188def as_runnable(traceable_fn: Callable) -> Runnable: 

1189 """Convert a function wrapped by the LangSmith `@traceable` decorator to a `Runnable`. 

1190 

1191 Args: 

1192 traceable_fn: The function wrapped by the `@traceable` decorator. 

1193 

1194 Returns: 

1195 Runnable: A `Runnable` object that maintains a consistent LangSmith 

1196 tracing context. 

1197 

1198 Raises: 

1199 ImportError: If `langchain` module is not installed. 

1200 ValueError: If the provided function is not wrapped by the `@traceable` decorator. 

1201 

1202 Example: 

1203 >>> @traceable 

1204 ... def my_function(input_data): 

1205 ... # Function implementation 

1206 ... pass 

1207 >>> runnable = as_runnable(my_function) 

1208 """ 

1209 try: 

1210 from langchain_core.runnables import RunnableConfig, RunnableLambda 

1211 from langchain_core.runnables.utils import Input, Output 

1212 except ImportError as e: 

1213 raise ImportError( 

1214 "as_runnable requires langchain-core to be installed. " 

1215 "You can install it with `pip install langchain-core`." 

1216 ) from e 

1217 if not is_traceable_function(traceable_fn): 

1218 try: 

1219 fn_src = inspect.getsource(traceable_fn) 

1220 except Exception: 

1221 fn_src = "<source unavailable>" 

1222 raise ValueError( 

1223 f"as_runnable expects a function wrapped by the LangSmith" 

1224 f" @traceable decorator. Got {traceable_fn} defined as:\n{fn_src}" 

1225 ) 

1226 

1227 class RunnableTraceable(RunnableLambda): 

1228 """Converts a `@traceable` decorated function to a `Runnable`. 

1229 

1230 This helps maintain a consistent LangSmith tracing context. 

1231 """ 

1232 

1233 def __init__( 

1234 self, 

1235 func: Callable, 

1236 afunc: Optional[Callable[..., Awaitable[Output]]] = None, 

1237 ) -> None: 

1238 wrapped: Optional[Callable[[Input], Output]] = None 

1239 awrapped = self._wrap_async(afunc) 

1240 if is_async(func): 

1241 if awrapped is not None: 

1242 raise TypeError( 

1243 "Func was provided as a coroutine function, but afunc was " 

1244 "also provided. If providing both, func should be a regular " 

1245 "function to avoid ambiguity." 

1246 ) 

1247 wrapped = cast(Callable[[Input], Output], self._wrap_async(func)) 

1248 elif is_traceable_function(func): 

1249 wrapped = cast(Callable[[Input], Output], self._wrap_sync(func)) 

1250 if wrapped is None: 

1251 raise ValueError( 

1252 f"{self.__class__.__name__} expects a function wrapped by" 

1253 " the LangSmith" 

1254 f" @traceable decorator. Got {func}" 

1255 ) 

1256 

1257 super().__init__( 

1258 wrapped, 

1259 cast( 

1260 Optional[Callable[[Input], Awaitable[Output]]], 

1261 awrapped, 

1262 ), 

1263 ) 

1264 

1265 @staticmethod 

1266 def _wrap_sync( 

1267 func: Callable[..., Output], 

1268 ) -> Callable[[Input, RunnableConfig], Output]: 

1269 """Wrap a synchronous function to make it asynchronous.""" 

1270 

1271 def wrap_traceable(inputs: dict, config: RunnableConfig) -> Any: 

1272 run_tree = run_trees.RunTree.from_runnable_config(cast(dict, config)) 

1273 return func(**inputs, langsmith_extra={"run_tree": run_tree}) 

1274 

1275 return cast(Callable[[Input, RunnableConfig], Output], wrap_traceable) 

1276 

1277 @staticmethod 

1278 def _wrap_async( 

1279 afunc: Optional[Callable[..., Awaitable[Output]]], 

1280 ) -> Optional[Callable[[Input, RunnableConfig], Awaitable[Output]]]: 

1281 """Wrap an async function to make it synchronous.""" 

1282 if afunc is None: 

1283 return None 

1284 

1285 if not is_traceable_function(afunc): 

1286 raise ValueError( 

1287 "RunnableTraceable expects a function wrapped by the LangSmith" 

1288 f" @traceable decorator. Got {afunc}" 

1289 ) 

1290 afunc_ = cast(Callable[..., Awaitable[Output]], afunc) 

1291 

1292 async def awrap_traceable(inputs: dict, config: RunnableConfig) -> Any: 

1293 run_tree = run_trees.RunTree.from_runnable_config(cast(dict, config)) 

1294 return await afunc_(**inputs, langsmith_extra={"run_tree": run_tree}) 

1295 

1296 return cast( 

1297 Callable[[Input, RunnableConfig], Awaitable[Output]], awrap_traceable 

1298 ) 

1299 

1300 return RunnableTraceable(traceable_fn) 

1301 

1302 

1303## Private Methods and Objects 

1304_VALID_RUN_TYPES = { 

1305 "tool", 

1306 "chain", 

1307 "llm", 

1308 "retriever", 

1309 "embedding", 

1310 "prompt", 

1311 "parser", 

1312} 

1313 

1314 

1315class _TraceableContainer(TypedDict, total=False): 

1316 """Typed response when initializing a run a traceable.""" 

1317 

1318 new_run: Optional[run_trees.RunTree] 

1319 project_name: Optional[str] 

1320 outer_project: Optional[str] 

1321 outer_metadata: Optional[dict[str, Any]] 

1322 outer_tags: Optional[list[str]] 

1323 _on_success: Optional[Callable[[run_trees.RunTree], Any]] 

1324 on_end: Optional[Callable[[run_trees.RunTree], Any]] 

1325 context: contextvars.Context 

1326 _token_event_logged: Optional[bool] 

1327 

1328 

1329class _ContainerInput(TypedDict, total=False): 

1330 """Typed response when initializing a run a traceable.""" 

1331 

1332 extra_outer: Optional[dict] 

1333 name: Optional[str] 

1334 metadata: Optional[dict[str, Any]] 

1335 tags: Optional[list[str]] 

1336 client: Optional[ls_client.Client] 

1337 reduce_fn: Optional[Callable] 

1338 project_name: Optional[str] 

1339 run_type: ls_client.RUN_TYPE_T 

1340 process_inputs: Optional[Callable[[dict], dict]] 

1341 process_chunk: Optional[Callable] 

1342 invocation_params_fn: Optional[Callable[[dict], dict]] 

1343 dangerously_allow_filesystem: Optional[bool] 

1344 

1345 

1346def _container_end( 

1347 container: _TraceableContainer, 

1348 outputs: Optional[Any] = None, 

1349 error: Optional[BaseException] = None, 

1350) -> None: 

1351 """End the run.""" 

1352 run_tree = container.get("new_run") 

1353 if run_tree is None: 

1354 # Tracing not enabled 

1355 return 

1356 if isinstance(outputs, dict): 

1357 dict_outputs = outputs 

1358 elif ( 

1359 outputs is not None 

1360 and hasattr(outputs, "model_dump") 

1361 and callable(outputs.model_dump) 

1362 and not isinstance(outputs, type) 

1363 ): 

1364 try: 

1365 dict_outputs = outputs.model_dump(exclude_none=True, mode="json") 

1366 except Exception as e: 

1367 LOGGER.debug( 

1368 f"Failed to use model_dump to serialize {type(outputs)} to JSON: {e}" 

1369 ) 

1370 dict_outputs = {"output": outputs} 

1371 else: 

1372 dict_outputs = {"output": outputs} 

1373 if (usage := _extract_usage(run_tree=run_tree, outputs=dict_outputs)) is not None: 

1374 run_tree.metadata["usage_metadata"] = usage 

1375 if error: 

1376 stacktrace = utils._format_exc() 

1377 error_repr = f"{repr(error)}\n\n{stacktrace}" 

1378 else: 

1379 error_repr = None 

1380 if (_on_success := container.get("_on_success")) and callable(_on_success): 

1381 try: 

1382 _on_success(run_tree) 

1383 except BaseException as e: 

1384 warnings.warn(f"Failed to run _on_success function: {e}") 

1385 

1386 run_tree.end(outputs=dict_outputs, error=error_repr) 

1387 if utils.tracing_is_enabled() is True: 

1388 run_tree.patch() 

1389 if (on_end := container.get("on_end")) and callable(on_end): 

1390 try: 

1391 on_end(run_tree) 

1392 except BaseException as e: 

1393 warnings.warn(f"Failed to run on_end function: {e}") 

1394 

1395 

1396def _collect_extra(extra_outer: dict, langsmith_extra: LangSmithExtra) -> dict: 

1397 run_extra = langsmith_extra.get("run_extra", None) 

1398 if run_extra: 

1399 extra_inner = {**extra_outer, **run_extra} 

1400 else: 

1401 extra_inner = extra_outer 

1402 return extra_inner 

1403 

1404 

1405def _get_parent_run( 

1406 langsmith_extra: LangSmithExtra, 

1407 config: Optional[dict] = None, 

1408) -> Optional[run_trees.RunTree]: 

1409 parent = langsmith_extra.get("parent") 

1410 if parent == "ignore": 

1411 return None 

1412 if isinstance(parent, run_trees.RunTree): 

1413 return parent 

1414 if isinstance(parent, Mapping): 

1415 return run_trees.RunTree.from_headers( 

1416 parent, 

1417 client=langsmith_extra.get("client"), 

1418 # Precedence: headers -> cvar -> explicit -> env var 

1419 project_name=_get_project_name(langsmith_extra.get("project_name")), 

1420 ) 

1421 if isinstance(parent, str): 

1422 dort = run_trees.RunTree.from_dotted_order( 

1423 parent, 

1424 client=langsmith_extra.get("client"), 

1425 # Precedence: cvar -> explicit -> env var 

1426 project_name=_get_project_name(langsmith_extra.get("project_name")), 

1427 ) 

1428 return dort 

1429 run_tree = langsmith_extra.get("run_tree") 

1430 if run_tree: 

1431 return run_tree 

1432 crt = get_current_run_tree() 

1433 if _runtime_env.get_langchain_core_version() is not None: 

1434 if rt := run_trees.RunTree.from_runnable_config( 

1435 config, client=langsmith_extra.get("client") 

1436 ): 

1437 # Still need to break ties when alternating between traceable and 

1438 # LanChain code. 

1439 # Nesting: LC -> LS -> LS, we want to still use LS as the parent 

1440 # Otherwise would look like LC -> {LS, LS} (siblings) 

1441 if ( 

1442 not crt # Simple LC -> LS 

1443 # Let user override if manually passed in or invoked in a 

1444 # RunnableSequence. This is a naive check. 

1445 or (config is not None and config.get("callbacks")) 

1446 # If the LangChain dotted order is more nested than the LangSmith 

1447 # dotted order, use the LangChain run as the parent. 

1448 # Note that this condition shouldn't be triggered in later 

1449 # versions of core, since we also update the run_tree context 

1450 # vars when updating the RunnableConfig context var. 

1451 or rt.dotted_order > crt.dotted_order 

1452 ): 

1453 return rt 

1454 return crt 

1455 

1456 

1457def _setup_run( 

1458 func: Callable, 

1459 container_input: _ContainerInput, 

1460 langsmith_extra: Optional[LangSmithExtra] = None, 

1461 args: Any = None, 

1462 kwargs: Any = None, 

1463) -> _TraceableContainer: 

1464 """Create a new run or create_child() if run is passed in kwargs.""" 

1465 extra_outer = container_input.get("extra_outer") or {} 

1466 metadata = container_input.get("metadata") 

1467 tags = container_input.get("tags") 

1468 client = container_input.get("client") 

1469 run_type = container_input.get("run_type") or "chain" 

1470 dangerously_allow_filesystem = container_input.get( 

1471 "dangerously_allow_filesystem", False 

1472 ) 

1473 outer_project = _context._PROJECT_NAME.get() 

1474 langsmith_extra = langsmith_extra or LangSmithExtra() 

1475 name = langsmith_extra.get("name") or container_input.get("name") 

1476 client_ = langsmith_extra.get("client", client) or _context._CLIENT.get() 

1477 parent_run_ = _get_parent_run( 

1478 {**langsmith_extra, "client": client_}, kwargs.get("config") 

1479 ) 

1480 project_cv = _context._PROJECT_NAME.get() 

1481 selected_project = ( 

1482 project_cv # From parent trace 

1483 or ( 

1484 parent_run_.session_name if parent_run_ else None 

1485 ) # from parent run attempt 2 (not managed by traceable) 

1486 or langsmith_extra.get("project_name") # at invocation time 

1487 or container_input["project_name"] # at decorator time 

1488 or _context._GLOBAL_PROJECT_NAME # global fallback from ls.configure 

1489 or utils.get_tracer_project() # default 

1490 ) 

1491 reference_example_id = langsmith_extra.get("reference_example_id") 

1492 id_ = langsmith_extra.get("run_id") 

1493 if not parent_run_ and not utils.tracing_is_enabled(): 

1494 utils.log_once( 

1495 logging.DEBUG, 

1496 "LangSmith tracing is not enabled, returning original function.", 

1497 ) 

1498 return _TraceableContainer( 

1499 new_run=None, 

1500 project_name=selected_project, 

1501 outer_project=outer_project, 

1502 outer_metadata=None, 

1503 outer_tags=None, 

1504 _on_success=langsmith_extra.get("_on_success"), 

1505 on_end=langsmith_extra.get("on_end"), 

1506 context=copy_context(), 

1507 _token_event_logged=False, 

1508 ) 

1509 signature = inspect.signature(func) 

1510 name_ = name or utils._get_function_name(func) 

1511 extra_inner = _collect_extra(extra_outer, langsmith_extra) 

1512 outer_metadata = _context._METADATA.get() or _context._GLOBAL_METADATA 

1513 outer_tags = _context._TAGS.get() or _context._GLOBAL_TAGS 

1514 context = copy_context() 

1515 metadata_ = { 

1516 **(langsmith_extra.get("metadata") or {}), 

1517 **(outer_metadata or {}), 

1518 } 

1519 context.run(_context._METADATA.set, metadata_) 

1520 metadata_.update(metadata or {}) 

1521 metadata_["ls_method"] = "traceable" 

1522 extra_inner["metadata"] = metadata_ 

1523 inputs, attachments = _get_inputs_and_attachments_safe( 

1524 signature, *args, func=func, **kwargs 

1525 ) 

1526 invocation_params_fn = container_input.get("invocation_params_fn") 

1527 if invocation_params_fn: 

1528 try: 

1529 invocation_params = { 

1530 k: v for k, v in invocation_params_fn(inputs).items() if v is not None 

1531 } 

1532 if invocation_params and isinstance(invocation_params, dict): 

1533 metadata_.update(invocation_params) 

1534 except BaseException as e: 

1535 LOGGER.error(f"Failed to infer invocation params for {name_}: {e}") 

1536 process_inputs = container_input.get("process_inputs") 

1537 if process_inputs: 

1538 try: 

1539 inputs = process_inputs(inputs) 

1540 except BaseException as e: 

1541 LOGGER.error(f"Failed to filter inputs for {name_}: {e}") 

1542 tags_ = (langsmith_extra.get("tags") or []) + (outer_tags or []) 

1543 context.run(_context._TAGS.set, tags_) 

1544 tags_ += tags or [] 

1545 if parent_run_ is not None: 

1546 new_run = parent_run_.create_child( 

1547 name=name_, 

1548 run_type=run_type, 

1549 inputs=inputs, 

1550 tags=tags_, 

1551 extra=extra_inner, 

1552 run_id=id_, 

1553 attachments=attachments, 

1554 ) 

1555 else: 

1556 # Create RunTree kwargs conditionally to let RunTree generate id from start_time 

1557 run_tree_kwargs = { 

1558 "name": name_, 

1559 "inputs": inputs, 

1560 "run_type": run_type, 

1561 "reference_example_id": ls_client._ensure_uuid( 

1562 reference_example_id, accept_null=True 

1563 ), 

1564 "project_name": selected_project, 

1565 "replicas": run_trees._REPLICAS.get(), 

1566 "extra": extra_inner, 

1567 "tags": tags_, 

1568 "client": client_, 

1569 "attachments": attachments, 

1570 "dangerously_allow_filesystem": dangerously_allow_filesystem, 

1571 } 

1572 # Only pass id if user explicitly provided one 

1573 if id_ is not None: 

1574 run_tree_kwargs["id"] = ls_client._ensure_uuid(id_) 

1575 new_run = run_trees.RunTree(**cast(Any, run_tree_kwargs)) 

1576 if utils.tracing_is_enabled() is True: 

1577 try: 

1578 new_run.post() 

1579 except BaseException as e: 

1580 LOGGER.error(f"Failed to post run {new_run.id}: {e}") 

1581 response_container = _TraceableContainer( 

1582 new_run=new_run, 

1583 project_name=selected_project, 

1584 outer_project=outer_project, 

1585 outer_metadata=outer_metadata, 

1586 outer_tags=outer_tags, 

1587 on_end=langsmith_extra.get("on_end"), 

1588 _on_success=langsmith_extra.get("_on_success"), 

1589 context=context, 

1590 _token_event_logged=False, 

1591 ) 

1592 context.run(_context._PROJECT_NAME.set, response_container["project_name"]) 

1593 context.run(_PARENT_RUN_TREE.set, response_container["new_run"]) 

1594 return response_container 

1595 

1596 

1597def _handle_container_end( 

1598 container: _TraceableContainer, 

1599 outputs: Optional[Any] = None, 

1600 error: Optional[BaseException] = None, 

1601 outputs_processor: Optional[Callable[..., dict]] = None, 

1602) -> None: 

1603 """Handle the end of run.""" 

1604 try: 

1605 if outputs_processor is not None: 

1606 outputs = outputs_processor(outputs) 

1607 _container_end(container, outputs=outputs, error=error) 

1608 except BaseException as e: 

1609 LOGGER.warning(f"Unable to process trace outputs: {repr(e)}") 

1610 

1611 

1612def _is_traceable_function(func: Any) -> bool: 

1613 return getattr(func, "__langsmith_traceable__", False) 

1614 

1615 

1616def _get_inputs( 

1617 signature: inspect.Signature, *args: Any, **kwargs: Any 

1618) -> dict[str, Any]: 

1619 """Return a dictionary of inputs from the function signature.""" 

1620 bound = signature.bind_partial(*args, **kwargs) 

1621 bound.apply_defaults() 

1622 arguments = dict(bound.arguments) 

1623 arguments.pop("self", None) 

1624 arguments.pop("cls", None) 

1625 for param_name, param in signature.parameters.items(): 

1626 if param.kind == inspect.Parameter.VAR_KEYWORD: 

1627 # Update with the **kwargs, and remove the original entry 

1628 # This is to help flatten out keyword arguments 

1629 if param_name in arguments: 

1630 arguments.update(arguments[param_name]) 

1631 arguments.pop(param_name) 

1632 

1633 return arguments 

1634 

1635 

1636def _get_inputs_safe( 

1637 signature: inspect.Signature, *args: Any, **kwargs: Any 

1638) -> dict[str, Any]: 

1639 try: 

1640 return _get_inputs(signature, *args, **kwargs) 

1641 except BaseException as e: 

1642 LOGGER.debug(f"Failed to get inputs for {signature}: {e}") 

1643 return {"args": args, "kwargs": kwargs} 

1644 

1645 

1646def _is_attachment(param: inspect.Parameter, func: Optional[Callable] = None) -> bool: 

1647 if param.annotation == schemas.Attachment or ( 

1648 get_origin(param.annotation) == Annotated 

1649 and any(arg == schemas.Attachment for arg in get_args(param.annotation)) 

1650 ): 

1651 return True 

1652 

1653 # try resolving stringified annotations 

1654 if func is not None and isinstance(param.annotation, str): 

1655 try: 

1656 # include_extras=True preserves annotated metadata 

1657 type_hints = get_type_hints(func, include_extras=True) 

1658 resolved_annotation = type_hints.get(param.name) 

1659 if resolved_annotation is not None: 

1660 return resolved_annotation == schemas.Attachment or ( 

1661 get_origin(resolved_annotation) == Annotated 

1662 and any( 

1663 arg == schemas.Attachment 

1664 for arg in get_args(resolved_annotation) 

1665 ) 

1666 ) 

1667 except (NameError, TypeError, AttributeError): 

1668 pass 

1669 

1670 return False 

1671 

1672 

1673def _attachment_args_helper( 

1674 signature: inspect.Signature, func: Optional[Callable] = None 

1675) -> set[str]: 

1676 return { 

1677 name 

1678 for name, param in signature.parameters.items() 

1679 if _is_attachment(param, func) 

1680 } 

1681 

1682 

1683@functools.lru_cache(maxsize=1000) 

1684def _cached_attachment_args( 

1685 signature: inspect.Signature, func: Optional[Callable] = None 

1686) -> set[str]: 

1687 return _attachment_args_helper(signature, func) 

1688 

1689 

1690def _attachment_args( 

1691 signature: inspect.Signature, func: Optional[Callable] = None 

1692) -> set[str]: 

1693 # Caching signatures fails if there's unhashable default values. 

1694 try: 

1695 return _cached_attachment_args(signature, func) 

1696 except TypeError: 

1697 return _attachment_args_helper(signature, func) 

1698 

1699 

1700def _get_inputs_and_attachments_safe( 

1701 signature: inspect.Signature, 

1702 *args: Any, 

1703 func: Optional[Callable] = None, 

1704 **kwargs: Any, 

1705) -> tuple[dict, schemas.Attachments]: 

1706 try: 

1707 inferred = _get_inputs(signature, *args, **kwargs) 

1708 attachment_args = _attachment_args(signature, func) 

1709 if attachment_args: 

1710 inputs, attachments = {}, {} 

1711 for k, v in inferred.items(): 

1712 if k in attachment_args: 

1713 attachments[k] = v 

1714 else: 

1715 inputs[k] = v 

1716 return inputs, attachments 

1717 return inferred, {} 

1718 except BaseException as e: 

1719 LOGGER.warning(f"Failed to get inputs for {signature}: {e}") 

1720 return {"args": args, "kwargs": kwargs}, {} 

1721 

1722 

1723def _set_tracing_context(context: Optional[dict[str, Any]] = None): 

1724 """Set the tracing context.""" 

1725 if context is None: 

1726 for k, v in _CONTEXT_KEYS.items(): 

1727 v.set(None) 

1728 return 

1729 for k, v in context.items(): 

1730 var = _CONTEXT_KEYS[k] 

1731 var.set(v) 

1732 

1733 

1734def _process_iterator( 

1735 generator: Iterator[T], 

1736 run_container: _TraceableContainer, 

1737 is_llm_run: bool, 

1738 # Results is mutated 

1739 results: list[Any], 

1740 process_chunk: Optional[Callable], 

1741 otel_context_manager: Optional[Any] = None, 

1742) -> Generator[T, None, Any]: 

1743 try: 

1744 while True: 

1745 if otel_context_manager: 

1746 # Create a fresh context manager for each iteration 

1747 def next_with_otel_context(): 

1748 # Get the run_tree from run_container to create a fresh context 

1749 run_tree = run_container.get("new_run") 

1750 if run_tree: 

1751 fresh_otel_context = _maybe_create_otel_context(run_tree) 

1752 if fresh_otel_context: 

1753 with fresh_otel_context: 

1754 return next(generator) 

1755 return next(generator) 

1756 

1757 item: T = run_container["context"].run(next_with_otel_context) # type: ignore[arg-type] 

1758 else: 

1759 item = run_container["context"].run(next, generator) # type: ignore[arg-type] 

1760 if process_chunk: 

1761 traced_item = process_chunk(item) 

1762 else: 

1763 traced_item = item 

1764 if ( 

1765 is_llm_run 

1766 and run_container["new_run"] 

1767 and not run_container.get("_token_event_logged") 

1768 ): 

1769 run_container["new_run"].add_event( 

1770 { 

1771 "name": "new_token", 

1772 "time": datetime.datetime.now( 

1773 datetime.timezone.utc 

1774 ).isoformat(), 

1775 "kwargs": {"token": traced_item}, 

1776 } 

1777 ) 

1778 run_container["_token_event_logged"] = True 

1779 results.append(traced_item) 

1780 yield item 

1781 except StopIteration as e: 

1782 return e.value 

1783 

1784 

1785async def _process_async_iterator( 

1786 generator: AsyncIterator[T], 

1787 run_container: _TraceableContainer, 

1788 *, 

1789 is_llm_run: bool, 

1790 accepts_context: bool, 

1791 results: list[Any], 

1792 process_chunk: Optional[Callable], 

1793 otel_context_manager: Optional[Any] = None, 

1794) -> AsyncGenerator[T, None]: 

1795 try: 

1796 while True: 

1797 if otel_context_manager: 

1798 # Create a fresh context manager for each iteration 

1799 async def anext_with_otel_context(): 

1800 # Get the run_tree from run_container to create a fresh context 

1801 run_tree = run_container.get("new_run") 

1802 if run_tree: 

1803 fresh_otel_context = _maybe_create_otel_context(run_tree) 

1804 if fresh_otel_context: 

1805 with fresh_otel_context: 

1806 return await aitertools.py_anext(generator) 

1807 return await aitertools.py_anext(generator) 

1808 

1809 if accepts_context: 

1810 item = await asyncio.create_task( # type: ignore[call-arg, var-annotated] 

1811 anext_with_otel_context(), 

1812 context=run_container["context"], 

1813 ) 

1814 else: 

1815 # Python < 3.11 

1816 with tracing_context( 

1817 **get_tracing_context(run_container["context"]) 

1818 ): 

1819 item = await anext_with_otel_context() 

1820 else: 

1821 if accepts_context: 

1822 item = await asyncio.create_task( # type: ignore[call-arg, var-annotated] 

1823 aitertools.py_anext(generator), # type: ignore[arg-type] 

1824 context=run_container["context"], 

1825 ) 

1826 else: 

1827 # Python < 3.11 

1828 with tracing_context( 

1829 **get_tracing_context(run_container["context"]) 

1830 ): 

1831 item = await aitertools.py_anext(generator) 

1832 if process_chunk: 

1833 traced_item = process_chunk(item) 

1834 else: 

1835 traced_item = item 

1836 if ( 

1837 is_llm_run 

1838 and run_container["new_run"] 

1839 and not run_container.get("_token_event_logged") 

1840 ): 

1841 run_container["new_run"].add_event( 

1842 { 

1843 "name": "new_token", 

1844 "time": datetime.datetime.now( 

1845 datetime.timezone.utc 

1846 ).isoformat(), 

1847 "kwargs": {"token": traced_item}, 

1848 } 

1849 ) 

1850 run_container["_token_event_logged"] = True 

1851 results.append(traced_item) 

1852 yield item 

1853 except StopAsyncIteration: 

1854 pass 

1855 

1856 

1857T = TypeVar("T") 

1858 

1859 

1860class _TracedStreamBase(Generic[T]): 

1861 """Base class for traced stream objects.""" 

1862 

1863 def __init__( 

1864 self, 

1865 stream: Union[Iterator[T], AsyncIterator[T]], 

1866 trace_container: _TraceableContainer, 

1867 reduce_fn: Optional[Callable] = None, 

1868 ): 

1869 self.__ls_stream__ = stream 

1870 self.__ls_trace_container__ = trace_container 

1871 self.__ls_completed__ = False 

1872 self.__ls_reduce_fn__ = reduce_fn 

1873 self.__ls_accumulated_output__: list[T] = [] 

1874 self.__is_llm_run__ = ( 

1875 trace_container["new_run"].run_type == "llm" 

1876 if trace_container["new_run"] 

1877 else False 

1878 ) 

1879 

1880 def __getattr__(self, name: str): 

1881 return getattr(self.__ls_stream__, name) 

1882 

1883 def __dir__(self): 

1884 return list(set(dir(self.__class__) + dir(self.__ls_stream__))) 

1885 

1886 def __repr__(self): 

1887 return f"Traceable({self.__ls_stream__!r})" 

1888 

1889 def __str__(self): 

1890 return str(self.__ls_stream__) 

1891 

1892 def __del__(self): 

1893 try: 

1894 if not self.__ls_completed__: 

1895 self._end_trace() 

1896 except BaseException: 

1897 pass 

1898 try: 

1899 self.__ls_stream__.__del__() 

1900 except BaseException: 

1901 pass 

1902 

1903 def _end_trace(self, error: Optional[BaseException] = None): 

1904 if self.__ls_completed__: 

1905 return 

1906 try: 

1907 if self.__ls_reduce_fn__: 

1908 reduced_output = self.__ls_reduce_fn__(self.__ls_accumulated_output__) 

1909 else: 

1910 reduced_output = self.__ls_accumulated_output__ 

1911 _container_end( 

1912 self.__ls_trace_container__, 

1913 outputs=reduced_output, 

1914 error=error, 

1915 ) 

1916 finally: 

1917 self.__ls_completed__ = True 

1918 

1919 

1920class _TracedStream(_TracedStreamBase, Generic[T]): 

1921 """A wrapper for synchronous stream objects that handles tracing.""" 

1922 

1923 def __init__( 

1924 self, 

1925 stream: Iterator[T], 

1926 trace_container: _TraceableContainer, 

1927 reduce_fn: Optional[Callable] = None, 

1928 process_chunk: Optional[Callable] = None, 

1929 ): 

1930 super().__init__( 

1931 stream=stream, 

1932 trace_container=trace_container, 

1933 reduce_fn=reduce_fn, 

1934 ) 

1935 self.__ls_stream__ = stream 

1936 self.__ls__gen__ = _process_iterator( 

1937 self.__ls_stream__, 

1938 self.__ls_trace_container__, 

1939 is_llm_run=self.__is_llm_run__, 

1940 results=self.__ls_accumulated_output__, 

1941 process_chunk=process_chunk, 

1942 ) 

1943 

1944 def __next__(self) -> T: 

1945 try: 

1946 return next(self.__ls__gen__) 

1947 except StopIteration: 

1948 self._end_trace() 

1949 raise 

1950 

1951 def __iter__(self) -> Iterator[T]: 

1952 try: 

1953 yield from self.__ls__gen__ 

1954 except BaseException as e: 

1955 _cleanup_traceback(e) 

1956 self._end_trace(error=e) 

1957 raise 

1958 else: 

1959 self._end_trace() 

1960 

1961 def __enter__(self): 

1962 self.__ls_stream__.__enter__() 

1963 return self 

1964 

1965 def __exit__(self, exc_type, exc_val, exc_tb): 

1966 try: 

1967 return self.__ls_stream__.__exit__(exc_type, exc_val, exc_tb) 

1968 finally: 

1969 self._end_trace(error=exc_val if exc_type else None) 

1970 

1971 

1972class _TracedAsyncStream(_TracedStreamBase, Generic[T]): 

1973 """A wrapper for asynchronous stream objects that handles tracing.""" 

1974 

1975 def __init__( 

1976 self, 

1977 stream: AsyncIterator[T], 

1978 trace_container: _TraceableContainer, 

1979 reduce_fn: Optional[Callable] = None, 

1980 process_chunk: Optional[Callable] = None, 

1981 ): 

1982 super().__init__( 

1983 stream=stream, 

1984 trace_container=trace_container, 

1985 reduce_fn=reduce_fn, 

1986 ) 

1987 self.__ls_stream__ = stream 

1988 self.__ls_gen = _process_async_iterator( 

1989 generator=self.__ls_stream__, 

1990 run_container=self.__ls_trace_container__, 

1991 is_llm_run=self.__is_llm_run__, 

1992 accepts_context=aitertools.asyncio_accepts_context(), 

1993 results=self.__ls_accumulated_output__, 

1994 process_chunk=process_chunk, 

1995 ) 

1996 

1997 async def _aend_trace(self, error: Optional[BaseException] = None): 

1998 ctx = copy_context() 

1999 await asyncio.shield( 

2000 aitertools.aio_to_thread(self._end_trace, error, __ctx=ctx) 

2001 ) 

2002 _set_tracing_context(get_tracing_context(ctx)) 

2003 

2004 async def __anext__(self) -> T: 

2005 try: 

2006 return cast(T, await aitertools.py_anext(self.__ls_gen)) 

2007 except StopAsyncIteration: 

2008 await self._aend_trace() 

2009 raise 

2010 

2011 async def __aiter__(self) -> AsyncIterator[T]: 

2012 try: 

2013 async for item in self.__ls_gen: 

2014 yield item 

2015 except BaseException: 

2016 await self._aend_trace() 

2017 raise 

2018 else: 

2019 await self._aend_trace() 

2020 

2021 async def __aenter__(self): 

2022 await self.__ls_stream__.__aenter__() 

2023 return self 

2024 

2025 async def __aexit__(self, exc_type, exc_val, exc_tb): 

2026 try: 

2027 return await self.__ls_stream__.__aexit__(exc_type, exc_val, exc_tb) 

2028 finally: 

2029 await self._aend_trace() 

2030 

2031 

2032def _get_function_result(results: list, reduce_fn: Callable) -> Any: 

2033 if results: 

2034 if reduce_fn is not None: 

2035 try: 

2036 return reduce_fn(results) 

2037 except BaseException as e: 

2038 LOGGER.error(e) 

2039 return results 

2040 else: 

2041 return results 

2042 

2043 

2044def _cleanup_traceback(e: BaseException): 

2045 tb_ = e.__traceback__ 

2046 if tb_: 

2047 while tb_.tb_next is not None and tb_.tb_frame.f_code.co_filename.endswith( 

2048 _EXCLUDED_FRAME_FNAME 

2049 ): 

2050 tb_ = tb_.tb_next 

2051 e.__traceback__ = tb_ 

2052 

2053 

2054def _is_otel_available() -> bool: 

2055 """Cache for otel import check.""" 

2056 global _OTEL_AVAILABLE 

2057 if _OTEL_AVAILABLE is None: 

2058 try: 

2059 import opentelemetry.trace # noqa: F401 

2060 

2061 _OTEL_AVAILABLE = True 

2062 except ImportError: 

2063 _OTEL_AVAILABLE = False 

2064 return _OTEL_AVAILABLE 

2065 

2066 

2067def _maybe_create_otel_context(run_tree: Optional[run_trees.RunTree]): 

2068 """Create OpenTelemetry context manager if OTEL is enabled and available. 

2069 

2070 Args: 

2071 run_tree: The current run tree. 

2072 

2073 Returns: 

2074 Context manager for use_span or None if not available. 

2075 """ 

2076 if not run_tree or not utils.is_env_var_truish("OTEL_ENABLED"): 

2077 return None 

2078 if not _is_otel_available(): 

2079 return None 

2080 

2081 from opentelemetry.trace import ( # type: ignore[import] 

2082 NonRecordingSpan, 

2083 SpanContext, 

2084 TraceFlags, 

2085 TraceState, 

2086 use_span, 

2087 ) 

2088 

2089 from langsmith._internal._otel_utils import ( 

2090 get_otel_span_id_from_uuid, 

2091 get_otel_trace_id_from_uuid, 

2092 ) 

2093 

2094 trace_id = get_otel_trace_id_from_uuid(run_tree.trace_id) 

2095 span_id = get_otel_span_id_from_uuid(run_tree.id) 

2096 

2097 span_context = SpanContext( 

2098 trace_id=trace_id, 

2099 span_id=span_id, 

2100 is_remote=False, 

2101 trace_flags=TraceFlags(TraceFlags.SAMPLED), 

2102 trace_state=TraceState(), 

2103 ) 

2104 

2105 non_recording_span = NonRecordingSpan(span_context) 

2106 return use_span(non_recording_span) 

2107 

2108 

2109# For backwards compatibility 

2110_PROJECT_NAME = _context._PROJECT_NAME 

2111_TAGS = _context._TAGS 

2112_METADATA = _context._METADATA 

2113_TRACING_ENABLED = _context._TRACING_ENABLED 

2114_CLIENT = _context._CLIENT 

2115_PARENT_RUN_TREE = _context._PARENT_RUN_TREE