Coverage for langsmith/async_client.py: 17%
487 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""The Async LangSmith Client."""
3from __future__ import annotations
5import asyncio
6import contextlib
7import datetime
8import json
9import uuid
10import warnings
11from collections.abc import AsyncGenerator, AsyncIterator, Mapping, Sequence
12from typing import (
13 Any,
14 Literal,
15 Optional,
16 Union,
17 cast,
18)
20import httpx
22from langsmith import client as ls_client
23from langsmith import schemas as ls_schemas
24from langsmith import utils as ls_utils
25from langsmith._internal import _beta_decorator as ls_beta
27ID_TYPE = Union[uuid.UUID, str]
30class AsyncClient:
31 """Async Client for interacting with the LangSmith API."""
33 __slots__ = ("_retry_config", "_client", "_web_url", "_settings")
35 def __init__(
36 self,
37 api_url: Optional[str] = None,
38 api_key: Optional[str] = None,
39 timeout_ms: Optional[
40 Union[
41 int, tuple[Optional[int], Optional[int], Optional[int], Optional[int]]
42 ]
43 ] = None,
44 retry_config: Optional[Mapping[str, Any]] = None,
45 web_url: Optional[str] = None,
46 ):
47 """Initialize the async client."""
48 self._retry_config = retry_config or {"max_retries": 3}
49 _headers = {
50 "Content-Type": "application/json",
51 }
52 api_key = ls_utils.get_api_key(api_key)
53 api_url = ls_utils.get_api_url(api_url)
54 if api_key:
55 _headers[ls_client.X_API_KEY] = api_key
56 ls_client._validate_api_key_if_hosted(api_url, api_key)
58 if isinstance(timeout_ms, int):
59 timeout_: Union[tuple, float] = (timeout_ms / 1000, None, None, None)
60 elif isinstance(timeout_ms, tuple):
61 timeout_ = tuple([t / 1000 if t is not None else None for t in timeout_ms])
62 else:
63 timeout_ = 10
64 self._client = httpx.AsyncClient(
65 base_url=api_url, headers=_headers, timeout=timeout_
66 )
67 self._web_url = web_url
68 self._settings: Optional[ls_schemas.LangSmithSettings] = None
70 async def __aenter__(self) -> AsyncClient:
71 """Enter the async client."""
72 return self
74 async def __aexit__(self, exc_type, exc_val, exc_tb):
75 """Exit the async client."""
76 await self.aclose()
78 async def aclose(self):
79 """Close the async client."""
80 await self._client.aclose()
82 @property
83 def _api_url(self):
84 return str(self._client.base_url)
86 @property
87 def _host_url(self) -> str:
88 """The web host url."""
89 return ls_utils.get_host_url(self._web_url, self._api_url)
91 async def _arequest_with_retries(
92 self,
93 method: str,
94 endpoint: str,
95 **kwargs: Any,
96 ) -> httpx.Response:
97 """Make an async HTTP request with retries."""
98 max_retries = cast(int, self._retry_config.get("max_retries", 3))
100 # Python requests library used by the normal Client filters out params with None values
101 # The httpx library does not. Filter them out here to keep behavior consistent
102 if "params" in kwargs:
103 params = kwargs["params"]
104 filtered_params = {k: v for k, v in params.items() if v is not None}
105 kwargs["params"] = filtered_params
107 for attempt in range(max_retries):
108 try:
109 response = await self._client.request(method, endpoint, **kwargs)
110 ls_utils.raise_for_status_with_text(response)
111 return response
112 except httpx.HTTPStatusError as e:
113 if response.status_code == 500:
114 raise ls_utils.LangSmithAPIError(
115 f"Server error caused failure to {method}"
116 f" {endpoint} in"
117 f" LangSmith API. {repr(e)}"
118 )
119 elif response.status_code == 408:
120 raise ls_utils.LangSmithRequestTimeout(
121 f"Client took too long to send request to {method}{endpoint}"
122 )
123 elif response.status_code == 429:
124 raise ls_utils.LangSmithRateLimitError(
125 f"Rate limit exceeded for {endpoint}. {repr(e)}"
126 )
127 elif response.status_code == 401:
128 raise ls_utils.LangSmithAuthError(
129 f"Authentication failed for {endpoint}. {repr(e)}"
130 )
131 elif response.status_code == 404:
132 raise ls_utils.LangSmithNotFoundError(
133 f"Resource not found for {endpoint}. {repr(e)}"
134 )
135 elif response.status_code == 409:
136 raise ls_utils.LangSmithConflictError(
137 f"Conflict for {endpoint}. {repr(e)}"
138 )
139 else:
140 raise ls_utils.LangSmithError(
141 f"Failed to {method} {endpoint} in LangSmith API. {repr(e)}"
142 )
143 except httpx.RequestError as e:
144 if attempt == max_retries - 1:
145 raise ls_utils.LangSmithConnectionError(f"Request error: {repr(e)}")
146 await asyncio.sleep(2**attempt)
147 raise ls_utils.LangSmithAPIError(
148 "Unexpected error connecting to the LangSmith API"
149 )
151 async def _aget_paginated_list(
152 self,
153 path: str,
154 params: Optional[dict[str, Any]] = None,
155 ) -> AsyncIterator[dict[str, Any]]:
156 """Get a paginated list of items."""
157 params = params or {}
158 offset = params.get("offset", 0)
159 params["limit"] = params.get("limit", 100)
160 while True:
161 params["offset"] = offset
162 response = await self._arequest_with_retries("GET", path, params=params)
163 items = response.json()
164 if not items:
165 break
166 for item in items:
167 yield item
168 if len(items) < params["limit"]:
169 break
170 offset += len(items)
172 async def _aget_cursor_paginated_list(
173 self,
174 path: str,
175 *,
176 body: Optional[dict] = None,
177 request_method: str = "POST",
178 data_key: str = "runs",
179 ) -> AsyncIterator[dict]:
180 """Get a cursor paginated list of items."""
181 params_ = body.copy() if body else {}
182 while True:
183 response = await self._arequest_with_retries(
184 request_method,
185 path,
186 content=ls_client._dumps_json(params_),
187 )
188 response_body = response.json()
189 if not response_body:
190 break
191 if not response_body.get(data_key):
192 break
193 for run in response_body[data_key]:
194 yield run
195 cursors = response_body.get("cursors")
196 if not cursors:
197 break
198 if not cursors.get("next"):
199 break
200 params_["cursor"] = cursors["next"]
202 async def create_run(
203 self,
204 name: str,
205 inputs: dict[str, Any],
206 run_type: str,
207 *,
208 project_name: Optional[str] = None,
209 revision_id: Optional[ls_client.ID_TYPE] = None,
210 **kwargs: Any,
211 ) -> None:
212 """Create a run."""
213 run_create = {
214 "name": name,
215 "id": kwargs.get("id") or uuid.uuid4(),
216 "inputs": inputs,
217 "run_type": run_type,
218 "session_name": project_name or ls_utils.get_tracer_project(),
219 "revision_id": revision_id,
220 **kwargs,
221 }
222 await self._arequest_with_retries(
223 "POST", "/runs", content=ls_client._dumps_json(run_create)
224 )
226 async def update_run(
227 self,
228 run_id: ls_client.ID_TYPE,
229 **kwargs: Any,
230 ) -> None:
231 """Update a run."""
232 data = {**kwargs, "id": ls_client._as_uuid(run_id)}
233 await self._arequest_with_retries(
234 "PATCH",
235 f"/runs/{ls_client._as_uuid(run_id)}",
236 content=ls_client._dumps_json(data),
237 )
239 async def read_run(self, run_id: ls_client.ID_TYPE) -> ls_schemas.Run:
240 """Read a run."""
241 response = await self._arequest_with_retries(
242 "GET",
243 f"/runs/{ls_client._as_uuid(run_id)}",
244 )
245 return ls_schemas.Run(**response.json())
247 async def list_runs(
248 self,
249 *,
250 project_id: Optional[
251 Union[ls_client.ID_TYPE, Sequence[ls_client.ID_TYPE]]
252 ] = None,
253 project_name: Optional[Union[str, Sequence[str]]] = None,
254 run_type: Optional[str] = None,
255 trace_id: Optional[ls_client.ID_TYPE] = None,
256 reference_example_id: Optional[ls_client.ID_TYPE] = None,
257 query: Optional[str] = None,
258 filter: Optional[str] = None,
259 trace_filter: Optional[str] = None,
260 tree_filter: Optional[str] = None,
261 is_root: Optional[bool] = None,
262 parent_run_id: Optional[ls_client.ID_TYPE] = None,
263 start_time: Optional[datetime.datetime] = None,
264 error: Optional[bool] = None,
265 run_ids: Optional[Sequence[ls_client.ID_TYPE]] = None,
266 select: Optional[Sequence[str]] = None,
267 limit: Optional[int] = None,
268 **kwargs: Any,
269 ) -> AsyncIterator[ls_schemas.Run]:
270 """List runs from the LangSmith API.
272 Parameters
273 ----------
274 project_id : UUID or None, default=None
275 The ID(s) of the project to filter by.
276 project_name : str or None, default=None
277 The name(s) of the project to filter by.
278 run_type : str or None, default=None
279 The type of the runs to filter by.
280 trace_id : UUID or None, default=None
281 The ID of the trace to filter by.
282 reference_example_id : UUID or None, default=None
283 The ID of the reference example to filter by.
284 query : str or None, default=None
285 The query string to filter by.
286 filter : str or None, default=None
287 The filter string to filter by.
288 trace_filter : str or None, default=None
289 Filter to apply to the ROOT run in the trace tree. This is meant to
290 be used in conjunction with the regular `filter` parameter to let you
291 filter runs by attributes of the root run within a trace.
292 tree_filter : str or None, default=None
293 Filter to apply to OTHER runs in the trace tree, including
294 sibling and child runs. This is meant to be used in conjunction with
295 the regular `filter` parameter to let you filter runs by attributes
296 of any run within a trace.
297 is_root : bool or None, default=None
298 Whether to filter by root runs.
299 parent_run_id : UUID or None, default=None
300 The ID of the parent run to filter by.
301 start_time : datetime or None, default=None
302 The start time to filter by.
303 error : bool or None, default=None
304 Whether to filter by error status.
305 run_ids : List[str or UUID] or None, default=None
306 The IDs of the runs to filter by.
307 limit : int or None, default=None
308 The maximum number of runs to return.
309 **kwargs : Any
310 Additional keyword arguments.
312 Yields:
313 ------
314 Run
315 The runs.
317 Examples:
318 --------
319 List all runs in a project:
321 ```python
322 project_runs = client.list_runs(project_name="<your_project>")
323 ```
325 List LLM and Chat runs in the last 24 hours:
327 ```python
328 todays_llm_runs = client.list_runs(
329 project_name="<your_project>",
330 start_time=datetime.now() - timedelta(days=1),
331 run_type="llm",
332 )
333 ```
335 List root traces in a project:
337 ```python
338 root_runs = client.list_runs(project_name="<your_project>", is_root=1)
339 ```
341 List runs without errors:
343 ```python
344 correct_runs = client.list_runs(project_name="<your_project>", error=False)
345 ```
347 List runs and only return their inputs/outputs (to speed up the query):
349 ```python
350 input_output_runs = client.list_runs(
351 project_name="<your_project>", select=["inputs", "outputs"]
352 )
353 ```
355 List runs by run ID:
357 ```python
358 run_ids = [
359 "a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836",
360 "9398e6be-964f-4aa4-8ae9-ad78cd4b7074",
361 ]
362 selected_runs = client.list_runs(id=run_ids)
363 ```
365 List all "chain" type runs that took more than 10 seconds and had
366 `total_tokens` greater than 5000:
368 ```python
369 chain_runs = client.list_runs(
370 project_name="<your_project>",
371 filter='and(eq(run_type, "chain"), gt(latency, 10), gt(total_tokens, 5000))',
372 )
373 ```
375 List all runs called "extractor" whose root of the trace was assigned feedback "user_score" score of 1:
377 ```python
378 good_extractor_runs = client.list_runs(
379 project_name="<your_project>",
380 filter='eq(name, "extractor")',
381 trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
382 )
383 ```
385 List all runs that started after a specific timestamp and either have "error" not equal to null or a "Correctness" feedback score equal to 0:
387 ```python
388 complex_runs = client.list_runs(
389 project_name="<your_project>",
390 filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))',
391 )
392 ```
394 List all runs where `tags` include "experimental" or "beta" and `latency` is greater than 2 seconds:
396 ```python
397 tagged_runs = client.list_runs(
398 project_name="<your_project>",
399 filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))',
400 )
401 ```
402 """
403 project_ids = []
404 if isinstance(project_id, (uuid.UUID, str)):
405 project_ids.append(project_id)
406 elif isinstance(project_id, list):
407 project_ids.extend(project_id)
408 if project_name is not None:
409 if isinstance(project_name, str):
410 project_name = [project_name]
411 projects = await asyncio.gather(
412 *[self.read_project(project_name=name) for name in project_name]
413 )
414 project_ids.extend([project.id for project in projects])
416 if select and "child_run_ids" in select:
417 warnings.warn(
418 "The child_run_ids field is deprecated and will be removed in following versions",
419 DeprecationWarning,
420 )
422 body_query: dict[str, Any] = {
423 "session": project_ids if project_ids else None,
424 "run_type": run_type,
425 "reference_example": (
426 [reference_example_id] if reference_example_id else None
427 ),
428 "query": query,
429 "filter": filter,
430 "trace_filter": trace_filter,
431 "tree_filter": tree_filter,
432 "is_root": is_root,
433 "parent_run": parent_run_id,
434 "start_time": start_time.isoformat() if start_time else None,
435 "error": error,
436 "id": run_ids,
437 "trace": trace_id,
438 "select": select,
439 "limit": limit,
440 **kwargs,
441 }
442 if project_ids:
443 body_query["session"] = [
444 str(ls_client._as_uuid(id_)) for id_ in project_ids
445 ]
446 body = {k: v for k, v in body_query.items() if v is not None}
447 ix = 0
448 async for run in self._aget_cursor_paginated_list("/runs/query", body=body):
449 yield ls_schemas.Run(**run)
450 ix += 1
451 if limit is not None and ix >= limit:
452 break
454 async def share_run(
455 self, run_id: ls_client.ID_TYPE, *, share_id: Optional[ls_client.ID_TYPE] = None
456 ) -> str:
457 """Get a share link for a run asynchronously.
459 Args:
460 run_id (ID_TYPE): The ID of the run to share.
461 share_id (Optional[ID_TYPE], optional): Custom share ID.
462 If not provided, a random UUID will be generated.
464 Returns:
465 str: The URL of the shared run.
467 Raises:
468 httpx.HTTPStatusError: If the API request fails.
469 """
470 run_id_ = ls_client._as_uuid(run_id, "run_id")
471 data = {
472 "run_id": str(run_id_),
473 "share_token": str(share_id or uuid.uuid4()),
474 }
475 response = await self._arequest_with_retries(
476 "PUT",
477 f"/runs/{run_id_}/share",
478 content=ls_client._dumps_json(data),
479 )
480 ls_utils.raise_for_status_with_text(response)
481 share_token = response.json()["share_token"]
482 return f"{self._host_url}/public/{share_token}/r"
484 async def run_is_shared(self, run_id: ls_client.ID_TYPE) -> bool:
485 """Get share state for a run asynchronously."""
486 link = await self.read_run_shared_link(ls_client._as_uuid(run_id, "run_id"))
487 return link is not None
489 async def read_run_shared_link(self, run_id: ls_client.ID_TYPE) -> Optional[str]:
490 """Retrieve the shared link for a specific run asynchronously.
492 Args:
493 run_id (ID_TYPE): The ID of the run.
495 Returns:
496 Optional[str]: The shared link for the run, or None if the link is not
497 available.
499 Raises:
500 httpx.HTTPStatusError: If the API request fails.
501 """
502 response = await self._arequest_with_retries(
503 "GET",
504 f"/runs/{ls_client._as_uuid(run_id, 'run_id')}/share",
505 )
506 ls_utils.raise_for_status_with_text(response)
507 result = response.json()
508 if result is None or "share_token" not in result:
509 return None
510 return f"{self._host_url}/public/{result['share_token']}/r"
512 async def create_project(
513 self,
514 project_name: str,
515 **kwargs: Any,
516 ) -> ls_schemas.TracerSession:
517 """Create a project."""
518 data = {"name": project_name, **kwargs}
519 response = await self._arequest_with_retries(
520 "POST", "/sessions", content=ls_client._dumps_json(data)
521 )
522 return ls_schemas.TracerSession(**response.json())
524 async def read_project(
525 self,
526 project_name: Optional[str] = None,
527 project_id: Optional[ls_client.ID_TYPE] = None,
528 ) -> ls_schemas.TracerSession:
529 """Read a project."""
530 if project_id:
531 response = await self._arequest_with_retries(
532 "GET", f"/sessions/{ls_client._as_uuid(project_id)}"
533 )
534 elif project_name:
535 response = await self._arequest_with_retries(
536 "GET", "/sessions", params={"name": project_name}
537 )
538 else:
539 raise ValueError("Either project_name or project_id must be provided")
541 data = response.json()
542 if isinstance(data, list):
543 if not data:
544 raise ls_utils.LangSmithNotFoundError(
545 f"Project {project_name} not found"
546 )
547 return ls_schemas.TracerSession(**data[0])
548 return ls_schemas.TracerSession(**data)
550 async def delete_project(
551 self, *, project_name: Optional[str] = None, project_id: Optional[str] = None
552 ) -> None:
553 """Delete a project from LangSmith.
555 Parameters
556 ----------
557 project_name : str or None, default=None
558 The name of the project to delete.
559 project_id : str or None, default=None
560 The ID of the project to delete.
561 """
562 if project_id is None and project_name is None:
563 raise ValueError("Either project_name or project_id must be provided")
564 if project_id is None:
565 project = await self.read_project(project_name=project_name)
566 project_id = str(project.id)
567 if not project_id:
568 raise ValueError("Project not found")
569 await self._arequest_with_retries(
570 "DELETE",
571 f"/sessions/{ls_client._as_uuid(project_id)}",
572 )
574 async def create_dataset(
575 self,
576 dataset_name: str,
577 **kwargs: Any,
578 ) -> ls_schemas.Dataset:
579 """Create a dataset."""
580 data = {"name": dataset_name, **kwargs}
581 response = await self._arequest_with_retries(
582 "POST", "/datasets", content=ls_client._dumps_json(data)
583 )
584 return ls_schemas.Dataset(**response.json())
586 async def read_dataset(
587 self,
588 dataset_name: Optional[str] = None,
589 dataset_id: Optional[ls_client.ID_TYPE] = None,
590 ) -> ls_schemas.Dataset:
591 """Read a dataset."""
592 if dataset_id:
593 response = await self._arequest_with_retries(
594 "GET", f"/datasets/{ls_client._as_uuid(dataset_id)}"
595 )
596 elif dataset_name:
597 response = await self._arequest_with_retries(
598 "GET", "/datasets", params={"name": dataset_name}
599 )
600 else:
601 raise ValueError("Either dataset_name or dataset_id must be provided")
603 data = response.json()
604 if isinstance(data, list):
605 if not data:
606 raise ls_utils.LangSmithNotFoundError(
607 f"Dataset {dataset_name} not found"
608 )
609 return ls_schemas.Dataset(**data[0])
610 return ls_schemas.Dataset(**data)
612 async def delete_dataset(self, dataset_id: ls_client.ID_TYPE) -> None:
613 """Delete a dataset."""
614 await self._arequest_with_retries(
615 "DELETE",
616 f"/datasets/{ls_client._as_uuid(dataset_id)}",
617 )
619 async def list_datasets(
620 self,
621 **kwargs: Any,
622 ) -> AsyncIterator[ls_schemas.Dataset]:
623 """List datasets."""
624 async for dataset in self._aget_paginated_list("/datasets", params=kwargs):
625 yield ls_schemas.Dataset(**dataset)
627 async def create_example(
628 self,
629 inputs: dict[str, Any],
630 outputs: Optional[dict[str, Any]] = None,
631 dataset_id: Optional[ls_client.ID_TYPE] = None,
632 dataset_name: Optional[str] = None,
633 **kwargs: Any,
634 ) -> ls_schemas.Example:
635 """Create an example."""
636 if dataset_id is None and dataset_name is None:
637 raise ValueError("Either dataset_id or dataset_name must be provided")
638 if dataset_id is None:
639 dataset = await self.read_dataset(dataset_name=dataset_name)
640 dataset_id = dataset.id
642 data = {
643 "inputs": inputs,
644 "outputs": outputs,
645 "dataset_id": str(dataset_id),
646 **kwargs,
647 }
648 response = await self._arequest_with_retries(
649 "POST", "/examples", content=ls_client._dumps_json(data)
650 )
651 return ls_schemas.Example(**response.json())
653 async def read_example(self, example_id: ls_client.ID_TYPE) -> ls_schemas.Example:
654 """Read an example."""
655 response = await self._arequest_with_retries(
656 "GET", f"/examples/{ls_client._as_uuid(example_id)}"
657 )
658 return ls_schemas.Example(**response.json())
660 async def list_examples(
661 self,
662 *,
663 dataset_id: Optional[ls_client.ID_TYPE] = None,
664 dataset_name: Optional[str] = None,
665 **kwargs: Any,
666 ) -> AsyncIterator[ls_schemas.Example]:
667 """List examples."""
668 params = kwargs.copy()
669 if dataset_id:
670 params["dataset"] = ls_client._as_uuid(dataset_id)
671 elif dataset_name:
672 dataset = await self.read_dataset(dataset_name=dataset_name)
673 params["dataset"] = dataset.id
675 async for example in self._aget_paginated_list("/examples", params=params):
676 yield ls_schemas.Example(**example)
678 async def create_feedback(
679 self,
680 run_id: Optional[ls_client.ID_TYPE],
681 key: str,
682 score: Optional[float] = None,
683 value: Optional[Any] = None,
684 comment: Optional[str] = None,
685 **kwargs: Any,
686 ) -> ls_schemas.Feedback:
687 """Create feedback for a run.
689 Args:
690 run_id (Optional[ls_client.ID_TYPE]): The ID of the run to provide feedback for.
691 Can be None for project-level feedback.
692 key (str): The name of the metric or aspect this feedback is about.
693 score (Optional[float]): The score to rate this run on the metric or aspect.
694 value (Optional[Any]): The display value or non-numeric value for this feedback.
695 comment (Optional[str]): A comment about this feedback.
696 **kwargs: Additional keyword arguments to include in the feedback data.
698 Returns:
699 ls_schemas.Feedback: The created feedback object.
701 Raises:
702 httpx.HTTPStatusError: If the API request fails.
703 """ # noqa: E501
704 data = {
705 "run_id": ls_client._ensure_uuid(run_id, accept_null=True),
706 "key": key,
707 "score": score,
708 "value": value,
709 "comment": comment,
710 **kwargs,
711 }
712 response = await self._arequest_with_retries(
713 "POST", "/feedback", content=ls_client._dumps_json(data)
714 )
715 return ls_schemas.Feedback(**response.json())
717 async def create_feedback_from_token(
718 self,
719 token_or_url: Union[str, uuid.UUID],
720 score: Union[float, int, bool, None] = None,
721 *,
722 value: Union[float, int, bool, str, dict, None] = None,
723 correction: Union[dict, None] = None,
724 comment: Union[str, None] = None,
725 metadata: Optional[dict] = None,
726 ) -> None:
727 """Create feedback from a presigned token or URL.
729 Args:
730 token_or_url (Union[str, uuid.UUID]): The token or URL from which to create
731 feedback.
732 score (Union[float, int, bool, None], optional): The score of the feedback.
733 Defaults to None.
734 value (Union[float, int, bool, str, dict, None], optional): The value of the
735 feedback. Defaults to None.
736 correction (Union[dict, None], optional): The correction of the feedback.
737 Defaults to None.
738 comment (Union[str, None], optional): The comment of the feedback. Defaults
739 to None.
740 metadata (Optional[dict], optional): Additional metadata for the feedback.
741 Defaults to None.
743 Raises:
744 ValueError: If the source API URL is invalid.
746 Returns:
747 None: This method does not return anything.
748 """
749 source_api_url, token_uuid = ls_client._parse_token_or_url(
750 token_or_url, self._api_url, num_parts=1
751 )
752 if source_api_url != self._api_url:
753 raise ValueError(f"Invalid source API URL. {source_api_url}")
754 response = await self._arequest_with_retries(
755 "POST",
756 f"/feedback/tokens/{ls_client._as_uuid(token_uuid)}",
757 content=ls_client._dumps_json(
758 {
759 "score": score,
760 "value": value,
761 "correction": correction,
762 "comment": comment,
763 "metadata": metadata,
764 # TODO: Add ID once the API supports it.
765 }
766 ),
767 )
768 ls_utils.raise_for_status_with_text(response)
770 async def create_presigned_feedback_token(
771 self,
772 run_id: ls_client.ID_TYPE,
773 feedback_key: str,
774 *,
775 expiration: Optional[datetime.datetime | datetime.timedelta] = None,
776 feedback_config: Optional[ls_schemas.FeedbackConfig] = None,
777 feedback_id: Optional[ls_client.ID_TYPE] = None,
778 ) -> ls_schemas.FeedbackIngestToken:
779 """Create a pre-signed URL to send feedback data to.
781 This is useful for giving browser-based clients a way to upload
782 feedback data directly to LangSmith without accessing the
783 API key.
785 Args:
786 run_id:
787 feedback_key:
788 expiration: The expiration time of the pre-signed URL.
789 Either a datetime or a timedelta offset from now.
790 Default to 3 hours.
791 feedback_config: FeedbackConfig or None.
792 If creating a feedback_key for the first time,
793 this defines how the metric should be interpreted,
794 such as a continuous score (w/ optional bounds),
795 or distribution over categorical values.
796 feedback_id: The ID of the feedback to create. If not provided, a new
797 feedback will be created.
799 Returns:
800 The pre-signed URL for uploading feedback data.
801 """
802 body: dict[str, Any] = {
803 "run_id": run_id,
804 "feedback_key": feedback_key,
805 "feedback_config": feedback_config,
806 "id": feedback_id or str(uuid.uuid4()),
807 }
808 if expiration is None:
809 body["expires_in"] = ls_schemas.TimeDeltaInput(
810 days=0,
811 hours=3,
812 minutes=0,
813 )
814 elif isinstance(expiration, datetime.datetime):
815 body["expires_at"] = expiration.isoformat()
816 elif isinstance(expiration, datetime.timedelta):
817 body["expires_in"] = ls_schemas.TimeDeltaInput(
818 days=expiration.days,
819 hours=expiration.seconds // 3600,
820 minutes=(expiration.seconds % 3600) // 60,
821 )
822 else:
823 raise ValueError(
824 f"Invalid expiration type: {type(expiration)}. "
825 "Expected datetime.datetime or datetime.timedelta."
826 )
828 response = await self._arequest_with_retries(
829 "POST",
830 "/feedback/tokens",
831 content=ls_client._dumps_json(body),
832 )
833 return ls_schemas.FeedbackIngestToken(**response.json())
835 async def read_feedback(
836 self, feedback_id: ls_client.ID_TYPE
837 ) -> ls_schemas.Feedback:
838 """Read feedback."""
839 response = await self._arequest_with_retries(
840 "GET", f"/feedback/{ls_client._as_uuid(feedback_id)}"
841 )
842 return ls_schemas.Feedback(**response.json())
844 async def list_feedback(
845 self,
846 *,
847 run_ids: Optional[Sequence[ls_client.ID_TYPE]] = None,
848 feedback_key: Optional[Sequence[str]] = None,
849 feedback_source_type: Optional[Sequence[ls_schemas.FeedbackSourceType]] = None,
850 limit: Optional[int] = None,
851 **kwargs: Any,
852 ) -> AsyncIterator[ls_schemas.Feedback]:
853 """List feedback."""
854 params = {
855 "run": (
856 [str(ls_client._as_uuid(id_)) for id_ in run_ids] if run_ids else None
857 ),
858 "limit": min(limit, 100) if limit is not None else 100,
859 **kwargs,
860 }
861 if feedback_key is not None:
862 params["key"] = feedback_key
863 if feedback_source_type is not None:
864 params["source"] = feedback_source_type
865 ix = 0
866 async for feedback in self._aget_paginated_list("/feedback", params=params):
867 yield ls_schemas.Feedback(**feedback)
868 ix += 1
869 if limit is not None and ix >= limit:
870 break
872 async def delete_feedback(self, feedback_id: ID_TYPE) -> None:
873 """Delete a feedback by ID.
875 Args:
876 feedback_id (Union[UUID, str]):
877 The ID of the feedback to delete.
879 Returns:
880 None
881 """
882 response = await self._arequest_with_retries(
883 "DELETE", f"/feedback/{ls_client._as_uuid(feedback_id, 'feedback_id')}"
884 )
885 ls_utils.raise_for_status_with_text(response)
887 # Annotation Queue API
889 async def list_annotation_queues(
890 self,
891 *,
892 queue_ids: Optional[list[ID_TYPE]] = None,
893 name: Optional[str] = None,
894 name_contains: Optional[str] = None,
895 limit: Optional[int] = None,
896 ) -> AsyncIterator[ls_schemas.AnnotationQueue]:
897 """List the annotation queues on the LangSmith API.
899 Args:
900 queue_ids (Optional[List[Union[UUID, str]]]):
901 The IDs of the queues to filter by.
902 name (Optional[str]):
903 The name of the queue to filter by.
904 name_contains (Optional[str]):
905 The substring that the queue name should contain.
906 limit (Optional[int]):
907 The maximum number of queues to return.
909 Yields:
910 The annotation queues.
911 """
912 params: dict = {
913 "ids": (
914 [
915 ls_client._as_uuid(id_, f"queue_ids[{i}]")
916 for i, id_ in enumerate(queue_ids)
917 ]
918 if queue_ids is not None
919 else None
920 ),
921 "name": name,
922 "name_contains": name_contains,
923 "limit": min(limit, 100) if limit is not None else 100,
924 }
925 ix = 0
926 async for feedback in self._aget_paginated_list(
927 "/annotation-queues", params=params
928 ):
929 yield ls_schemas.AnnotationQueue(**feedback)
930 ix += 1
931 if limit is not None and ix >= limit:
932 break
934 async def create_annotation_queue(
935 self,
936 *,
937 name: str,
938 description: Optional[str] = None,
939 queue_id: Optional[ID_TYPE] = None,
940 ) -> ls_schemas.AnnotationQueue:
941 """Create an annotation queue on the LangSmith API.
943 Args:
944 name (str):
945 The name of the annotation queue.
946 description (Optional[str]):
947 The description of the annotation queue.
948 queue_id (Optional[Union[UUID, str]]):
949 The ID of the annotation queue.
951 Returns:
952 AnnotationQueue: The created annotation queue object.
953 """
954 body = {
955 "name": name,
956 "description": description,
957 "id": str(queue_id) if queue_id is not None else str(uuid.uuid4()),
958 }
959 response = await self._arequest_with_retries(
960 "POST",
961 "/annotation-queues",
962 json={k: v for k, v in body.items() if v is not None},
963 )
964 ls_utils.raise_for_status_with_text(response)
965 return ls_schemas.AnnotationQueue(
966 **response.json(),
967 )
969 async def read_annotation_queue(
970 self, queue_id: ID_TYPE
971 ) -> ls_schemas.AnnotationQueue:
972 """Read an annotation queue with the specified `queue_id`.
974 Args:
975 queue_id (Union[UUID, str]): The ID of the annotation queue to read.
977 Returns:
978 AnnotationQueue: The annotation queue object.
979 """
980 # TODO: Replace when actual endpoint is added
981 return await self.list_annotation_queues(queue_ids=[queue_id]).__anext__()
983 async def update_annotation_queue(
984 self, queue_id: ID_TYPE, *, name: str, description: Optional[str] = None
985 ) -> None:
986 """Update an annotation queue with the specified `queue_id`.
988 Args:
989 queue_id (Union[UUID, str]): The ID of the annotation queue to update.
990 name (str): The new name for the annotation queue.
991 description (Optional[str]): The new description for the
992 annotation queue. Defaults to None.
994 Returns:
995 None
996 """
997 response = await self._arequest_with_retries(
998 "PATCH",
999 f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}",
1000 json={
1001 "name": name,
1002 "description": description,
1003 },
1004 )
1005 ls_utils.raise_for_status_with_text(response)
1007 async def delete_annotation_queue(self, queue_id: ID_TYPE) -> None:
1008 """Delete an annotation queue with the specified `queue_id`.
1010 Args:
1011 queue_id (Union[UUID, str]): The ID of the annotation queue to delete.
1013 Returns:
1014 None
1015 """
1016 response = await self._arequest_with_retries(
1017 "DELETE",
1018 f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}",
1019 headers={"Accept": "application/json", **self._client.headers},
1020 )
1021 ls_utils.raise_for_status_with_text(response)
1023 async def add_runs_to_annotation_queue(
1024 self, queue_id: ID_TYPE, *, run_ids: list[ID_TYPE]
1025 ) -> None:
1026 """Add runs to an annotation queue with the specified `queue_id`.
1028 Args:
1029 queue_id (Union[UUID, str]): The ID of the annotation queue.
1030 run_ids (List[Union[UUID, str]]): The IDs of the runs to be added to the annotation
1031 queue.
1033 Returns:
1034 None
1035 """
1036 response = await self._arequest_with_retries(
1037 "POST",
1038 f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/runs",
1039 json=[
1040 str(ls_client._as_uuid(id_, f"run_ids[{i}]"))
1041 for i, id_ in enumerate(run_ids)
1042 ],
1043 )
1044 ls_utils.raise_for_status_with_text(response)
1046 async def delete_run_from_annotation_queue(
1047 self, queue_id: ID_TYPE, *, run_id: ID_TYPE
1048 ) -> None:
1049 """Delete a run from an annotation queue with the specified `queue_id` and `run_id`.
1051 Args:
1052 queue_id (Union[UUID, str]): The ID of the annotation queue.
1053 run_id (Union[UUID, str]): The ID of the run to be added to the annotation
1054 queue.
1056 Returns:
1057 None
1058 """
1059 response = await self._arequest_with_retries(
1060 "DELETE",
1061 f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/runs/{ls_client._as_uuid(run_id, 'run_id')}",
1062 )
1063 ls_utils.raise_for_status_with_text(response)
1065 async def get_run_from_annotation_queue(
1066 self, queue_id: ID_TYPE, *, index: int
1067 ) -> ls_schemas.RunWithAnnotationQueueInfo:
1068 """Get a run from an annotation queue at the specified index.
1070 Args:
1071 queue_id (Union[UUID, str]): The ID of the annotation queue.
1072 index (int): The index of the run to retrieve.
1074 Returns:
1075 RunWithAnnotationQueueInfo: The run at the specified index.
1077 Raises:
1078 LangSmithNotFoundError: If the run is not found at the given index.
1079 LangSmithError: For other API-related errors.
1080 """
1081 base_url = f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/run"
1082 response = await self._arequest_with_retries("GET", f"{base_url}/{index}")
1083 ls_utils.raise_for_status_with_text(response)
1084 return ls_schemas.RunWithAnnotationQueueInfo(**response.json())
1086 @ls_beta.warn_beta
1087 async def index_dataset(
1088 self,
1089 *,
1090 dataset_id: ls_client.ID_TYPE,
1091 tag: str = "latest",
1092 **kwargs: Any,
1093 ) -> None:
1094 """Enable dataset indexing. Examples are indexed by their inputs.
1096 This enables searching for similar examples by inputs with
1097 ``client.similar_examples()``.
1099 Args:
1100 dataset_id (UUID): The ID of the dataset to index.
1101 tag (str, optional): The version of the dataset to index. If 'latest'
1102 then any updates to the dataset (additions, updates, deletions of
1103 examples) will be reflected in the index.
1105 Returns:
1106 None
1108 Raises:
1109 requests.HTTPError: If the request fails.
1110 """ # noqa: E501
1111 dataset_id = ls_client._as_uuid(dataset_id, "dataset_id")
1112 resp = await self._arequest_with_retries(
1113 "POST",
1114 f"/datasets/{dataset_id}/index",
1115 content=ls_client._dumps_json({"tag": tag, **kwargs}),
1116 )
1117 ls_utils.raise_for_status_with_text(resp)
1119 @ls_beta.warn_beta
1120 async def sync_indexed_dataset(
1121 self,
1122 *,
1123 dataset_id: ls_client.ID_TYPE,
1124 **kwargs: Any,
1125 ) -> None:
1126 """Sync dataset index.
1128 This already happens automatically every 5 minutes, but you can call this to
1129 force a sync.
1131 Args:
1132 dataset_id (UUID): The ID of the dataset to sync.
1134 Returns:
1135 None
1137 Raises:
1138 requests.HTTPError: If the request fails.
1139 """ # noqa: E501
1140 dataset_id = ls_client._as_uuid(dataset_id, "dataset_id")
1141 resp = await self._arequest_with_retries(
1142 "POST",
1143 f"/datasets/{dataset_id}/index/sync",
1144 content=ls_client._dumps_json({**kwargs}),
1145 )
1146 ls_utils.raise_for_status_with_text(resp)
1148 @ls_beta.warn_beta
1149 async def similar_examples(
1150 self,
1151 inputs: dict,
1152 /,
1153 *,
1154 limit: int,
1155 dataset_id: ls_client.ID_TYPE,
1156 filter: Optional[str] = None,
1157 **kwargs: Any,
1158 ) -> list[ls_schemas.ExampleSearch]:
1159 r"""Retrieve the dataset examples whose inputs best match the current inputs.
1161 !!! note
1163 Must have few-shot indexing enabled for the dataset. See `client.index_dataset()`.
1165 Args:
1166 inputs (dict): The inputs to use as a search query. Must match the dataset
1167 input schema. Must be JSON serializable.
1168 limit (int): The maximum number of examples to return.
1169 dataset_id (str or UUID): The ID of the dataset to search over.
1170 filter (str, optional): A filter string to apply to the search results. Uses
1171 the same syntax as the `filter` parameter in `list_runs()`. Only a subset
1172 of operations are supported. Defaults to None.
1173 kwargs (Any): Additional keyword args to pass as part of request body.
1175 Returns:
1176 List of ExampleSearch objects.
1178 Examples:
1179 ```python
1180 from langsmith import Client
1182 client = Client()
1183 await client.similar_examples(
1184 {"question": "When would i use the runnable generator"},
1185 limit=3,
1186 dataset_id="...",
1187 )
1188 ```
1190 ```python
1191 [
1192 ExampleSearch(
1193 inputs={
1194 "question": "How do I cache a Chat model? What caches can I use?"
1195 },
1196 outputs={
1197 "answer": "You can use LangChain's caching layer for Chat Models. This can save you money by reducing the number of API calls you make to the LLM provider, if you're often requesting the same completion multiple times, and speed up your application.\n\n```python\n\nfrom langchain.cache import InMemoryCache\nlangchain.llm_cache = InMemoryCache()\n\n# The first time, it is not yet in cache, so it should take longer\nllm.predict('Tell me a joke')\n\n```\n\nYou can also use SQLite Cache which uses a SQLite database:\n\n```python\n rm .langchain.db\n\nfrom langchain.cache import SQLiteCache\nlangchain.llm_cache = SQLiteCache(database_path=\".langchain.db\")\n\n# The first time, it is not yet in cache, so it should take longer\nllm.predict('Tell me a joke') \n```\n"
1198 },
1199 metadata=None,
1200 id=UUID("b2ddd1c4-dff6-49ae-8544-f48e39053398"),
1201 dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"),
1202 ),
1203 ExampleSearch(
1204 inputs={"question": "What's a runnable lambda?"},
1205 outputs={
1206 "answer": "A runnable lambda is an object that implements LangChain's `Runnable` interface and runs a callbale (i.e., a function). Note the function must accept a single argument."
1207 },
1208 metadata=None,
1209 id=UUID("f94104a7-2434-4ba7-8293-6a283f4860b4"),
1210 dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"),
1211 ),
1212 ExampleSearch(
1213 inputs={"question": "Show me how to use RecursiveURLLoader"},
1214 outputs={
1215 "answer": 'The RecursiveURLLoader comes from the langchain.document_loaders.recursive_url_loader module. Here\'s an example of how to use it:\n\n```python\nfrom langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader\n\n# Create an instance of RecursiveUrlLoader with the URL you want to load\nloader = RecursiveUrlLoader(url="https://example.com")\n\n# Load all child links from the URL page\nchild_links = loader.load()\n\n# Print the child links\nfor link in child_links:\n print(link)\n```\n\nMake sure to replace "https://example.com" with the actual URL you want to load. The load() method returns a list of child links found on the URL page. You can iterate over this list to access each child link.'
1216 },
1217 metadata=None,
1218 id=UUID("0308ea70-a803-4181-a37d-39e95f138f8c"),
1219 dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"),
1220 ),
1221 ]
1222 ```
1224 """ # noqa: E501
1225 dataset_id = ls_client._as_uuid(dataset_id, "dataset_id")
1226 req = {
1227 "inputs": inputs,
1228 "limit": limit,
1229 **kwargs,
1230 }
1231 if filter:
1232 req["filter"] = filter
1234 resp = await self._arequest_with_retries(
1235 "POST",
1236 f"/datasets/{dataset_id}/search",
1237 content=ls_client._dumps_json(req),
1238 )
1239 ls_utils.raise_for_status_with_text(resp)
1240 examples = []
1241 for ex in resp.json()["examples"]:
1242 examples.append(ls_schemas.ExampleSearch(**ex, dataset_id=dataset_id))
1243 return examples
1245 async def _get_settings(self) -> ls_schemas.LangSmithSettings:
1246 """Get the settings for the current tenant.
1248 Returns:
1249 dict: The settings for the current tenant.
1250 """
1251 if self._settings is None:
1252 response = await self._arequest_with_retries("GET", "/settings")
1253 ls_utils.raise_for_status_with_text(response)
1254 self._settings = ls_schemas.LangSmithSettings(**response.json())
1256 return self._settings
1258 async def _current_tenant_is_owner(self, owner: str) -> bool:
1259 """Check if the current workspace has the same handle as owner.
1261 Args:
1262 owner (str): The owner to check against.
1264 Returns:
1265 bool: True if the current tenant is the owner, False otherwise.
1266 """
1267 settings = await self._get_settings()
1268 return owner == "-" or settings.tenant_handle == owner
1270 async def _owner_conflict_error(
1271 self, action: str, owner: str
1272 ) -> ls_utils.LangSmithUserError:
1273 settings = await self._get_settings()
1274 return ls_utils.LangSmithUserError(
1275 f"Cannot {action} for another tenant.\n"
1276 f"Current tenant: {settings.tenant_handle},\n"
1277 f"Requested tenant: {owner}"
1278 )
1280 async def _get_latest_commit_hash(
1281 self, prompt_owner_and_name: str, limit: int = 1, offset: int = 0
1282 ) -> Optional[str]:
1283 """Get the latest commit hash for a prompt.
1285 Args:
1286 prompt_owner_and_name (str): The owner and name of the prompt.
1287 limit (int, default=1): The maximum number of commits to fetch. Defaults to 1.
1288 offset (int, default=0): The number of commits to skip. Defaults to 0.
1290 Returns:
1291 Optional[str]: The latest commit hash, or None if no commits are found.
1292 """
1293 response = await self._arequest_with_retries(
1294 "GET",
1295 f"/commits/{prompt_owner_and_name}/",
1296 params={"limit": limit, "offset": offset},
1297 )
1298 commits = response.json()["commits"]
1299 return commits[0]["commit_hash"] if commits else None
1301 async def _like_or_unlike_prompt(
1302 self, prompt_identifier: str, like: bool
1303 ) -> dict[str, int]:
1304 """Like or unlike a prompt.
1306 Args:
1307 prompt_identifier (str): The identifier of the prompt.
1308 like (bool): True to like the prompt, False to unlike it.
1310 Returns:
1311 A dictionary with the key 'likes' and the count of likes as the value.
1313 Raises:
1314 requests.exceptions.HTTPError: If the prompt is not found or
1315 another error occurs.
1316 """
1317 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier)
1318 response = await self._arequest_with_retries(
1319 "POST", f"/likes/{owner}/{prompt_name}", json={"like": like}
1320 )
1321 response.raise_for_status()
1322 return response.json()
1324 async def _get_prompt_url(self, prompt_identifier: str) -> str:
1325 """Get a URL for a prompt.
1327 Args:
1328 prompt_identifier (str): The identifier of the prompt.
1330 Returns:
1331 str: The URL for the prompt.
1333 """
1334 owner, prompt_name, commit_hash = ls_utils.parse_prompt_identifier(
1335 prompt_identifier
1336 )
1338 if not self._current_tenant_is_owner(owner):
1339 return f"{self._host_url}/hub/{owner}/{prompt_name}:{commit_hash[:8]}"
1341 settings = await self._get_settings()
1342 return (
1343 f"{self._host_url}/prompts/{prompt_name}/{commit_hash[:8]}"
1344 f"?organizationId={settings.id}"
1345 )
1347 async def _prompt_exists(self, prompt_identifier: str) -> bool:
1348 """Check if a prompt exists.
1350 Args:
1351 prompt_identifier (str): The identifier of the prompt.
1353 Returns:
1354 bool: True if the prompt exists, False otherwise.
1355 """
1356 prompt = await self.get_prompt(prompt_identifier)
1357 return True if prompt else False
1359 async def like_prompt(self, prompt_identifier: str) -> dict[str, int]:
1360 """Like a prompt.
1362 Args:
1363 prompt_identifier (str): The identifier of the prompt.
1365 Returns:
1366 Dict[str, int]: A dictionary with the key 'likes' and the count of likes as the value.
1368 """
1369 return await self._like_or_unlike_prompt(prompt_identifier, like=True)
1371 async def unlike_prompt(self, prompt_identifier: str) -> dict[str, int]:
1372 """Unlike a prompt.
1374 Args:
1375 prompt_identifier (str): The identifier of the prompt.
1377 Returns:
1378 Dict[str, int]: A dictionary with the key 'likes' and the count of likes as the value.
1380 """
1381 return await self._like_or_unlike_prompt(prompt_identifier, like=False)
1383 async def list_prompts(
1384 self,
1385 *,
1386 limit: int = 100,
1387 offset: int = 0,
1388 is_public: Optional[bool] = None,
1389 is_archived: Optional[bool] = False,
1390 sort_field: ls_schemas.PromptSortField = ls_schemas.PromptSortField.updated_at,
1391 sort_direction: Literal["desc", "asc"] = "desc",
1392 query: Optional[str] = None,
1393 ) -> ls_schemas.ListPromptsResponse:
1394 """List prompts with pagination.
1396 Args:
1397 limit (int, default=100): The maximum number of prompts to return. Defaults to 100.
1398 offset (int, default=0): The number of prompts to skip. Defaults to 0.
1399 is_public (Optional[bool]): Filter prompts by if they are public.
1400 is_archived (Optional[bool]): Filter prompts by if they are archived.
1401 sort_field (PromptSortField): The field to sort by.
1402 Defaults to "updated_at".
1403 sort_direction (Literal["desc", "asc"], default="desc"): The order to sort by.
1404 Defaults to "desc".
1405 query (Optional[str]): Filter prompts by a search query.
1407 Returns:
1408 ListPromptsResponse: A response object containing
1409 the list of prompts.
1410 """
1411 params = {
1412 "limit": limit,
1413 "offset": offset,
1414 "is_public": (
1415 "true" if is_public else "false" if is_public is not None else None
1416 ),
1417 "is_archived": "true" if is_archived else "false",
1418 "sort_field": (
1419 sort_field.value
1420 if isinstance(sort_field, ls_schemas.PromptSortField)
1421 else sort_field
1422 ),
1423 "sort_direction": sort_direction,
1424 "query": query,
1425 "match_prefix": "true" if query else None,
1426 }
1428 response = await self._arequest_with_retries(
1429 "GET", "/repos/", params=_exclude_none(params)
1430 )
1431 return ls_schemas.ListPromptsResponse(**response.json())
1433 async def get_prompt(self, prompt_identifier: str) -> Optional[ls_schemas.Prompt]:
1434 """Get a specific prompt by its identifier.
1436 Args:
1437 prompt_identifier (str): The identifier of the prompt.
1438 The identifier should be in the format "prompt_name" or "owner/prompt_name".
1440 Returns:
1441 Optional[Prompt]: The prompt object.
1443 Raises:
1444 requests.exceptions.HTTPError: If the prompt is not found or
1445 another error occurs.
1446 """
1447 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier)
1448 try:
1449 response = await self._arequest_with_retries(
1450 "GET", f"/repos/{owner}/{prompt_name}"
1451 )
1452 return ls_schemas.Prompt(**response.json()["repo"])
1453 except ls_utils.LangSmithNotFoundError:
1454 return None
1456 async def create_prompt(
1457 self,
1458 prompt_identifier: str,
1459 *,
1460 description: Optional[str] = None,
1461 readme: Optional[str] = None,
1462 tags: Optional[Sequence[str]] = None,
1463 is_public: bool = False,
1464 ) -> ls_schemas.Prompt:
1465 """Create a new prompt.
1467 Does not attach prompt object, just creates an empty prompt.
1469 Args:
1470 prompt_identifier (str): The identifier of the prompt.
1471 The identifier should be in the formatof owner/name:hash, name:hash, owner/name, or name
1472 description (Optional[str]): A description of the prompt.
1473 readme (Optional[str]): A readme for the prompt.
1474 tags (Optional[Sequence[str]]): A list of tags for the prompt.
1475 is_public (bool): Whether the prompt should be public. Defaults to False.
1477 Returns:
1478 Prompt: The created prompt object.
1480 Raises:
1481 ValueError: If the current tenant is not the owner.
1482 HTTPError: If the server request fails.
1483 """
1484 settings = await self._get_settings()
1485 if is_public and not settings.tenant_handle:
1486 raise ls_utils.LangSmithUserError(
1487 "Cannot create a public prompt without first\n"
1488 "creating a LangChain Hub handle. "
1489 "You can add a handle by creating a public prompt at:\n"
1490 "https://smith.langchain.com/prompts"
1491 )
1493 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier)
1494 if not (await self._current_tenant_is_owner(owner=owner)):
1495 raise (await self._owner_conflict_error("create a prompt", owner))
1497 json: dict[str, Union[str, bool, Sequence[str]]] = {
1498 "repo_handle": prompt_name,
1499 "description": description or "",
1500 "readme": readme or "",
1501 "tags": tags or [],
1502 "is_public": is_public,
1503 }
1505 response = await self._arequest_with_retries("POST", "/repos/", json=json)
1506 response.raise_for_status()
1507 return ls_schemas.Prompt(**response.json()["repo"])
1509 async def create_commit(
1510 self,
1511 prompt_identifier: str,
1512 object: Any,
1513 *,
1514 parent_commit_hash: Optional[str] = None,
1515 ) -> str:
1516 """Create a commit for an existing prompt.
1518 Args:
1519 prompt_identifier (str): The identifier of the prompt.
1520 object (Any): The LangChain object to commit.
1521 parent_commit_hash (Optional[str]): The hash of the parent commit.
1522 Defaults to latest commit.
1524 Returns:
1525 str: The url of the prompt commit.
1527 Raises:
1528 HTTPError: If the server request fails.
1529 ValueError: If the prompt does not exist.
1530 """
1531 if not (await self._prompt_exists(prompt_identifier)):
1532 raise ls_utils.LangSmithNotFoundError(
1533 "Prompt does not exist, you must create it first."
1534 )
1536 try:
1537 from langchain_core.load import dumps
1538 except ImportError:
1539 raise ImportError(
1540 "The client.create_commit function requires the langchain-core"
1541 "package to run.\nInstall with `pip install langchain-core`"
1542 )
1544 chain_to_push = ls_client.prep_obj_for_push(object)
1545 json_object = dumps(chain_to_push)
1546 manifest_dict = json.loads(json_object)
1548 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier)
1549 prompt_owner_and_name = f"{owner}/{prompt_name}"
1551 if parent_commit_hash == "latest" or parent_commit_hash is None:
1552 parent_commit_hash = await self._get_latest_commit_hash(
1553 prompt_owner_and_name
1554 )
1556 request_dict = {"parent_commit": parent_commit_hash, "manifest": manifest_dict}
1557 response = await self._arequest_with_retries(
1558 "POST", f"/commits/{prompt_owner_and_name}", json=request_dict
1559 )
1561 commit_hash = response.json()["commit"]["commit_hash"]
1563 return await self._get_prompt_url(f"{prompt_owner_and_name}:{commit_hash}")
1565 async def update_prompt(
1566 self,
1567 prompt_identifier: str,
1568 *,
1569 description: Optional[str] = None,
1570 readme: Optional[str] = None,
1571 tags: Optional[Sequence[str]] = None,
1572 is_public: Optional[bool] = None,
1573 is_archived: Optional[bool] = None,
1574 ) -> dict[str, Any]:
1575 """Update a prompt's metadata.
1577 To update the content of a prompt, use push_prompt or create_commit instead.
1579 Args:
1580 prompt_identifier (str): The identifier of the prompt to update.
1581 description (Optional[str]): New description for the prompt.
1582 readme (Optional[str]): New readme for the prompt.
1583 tags (Optional[Sequence[str]]): New list of tags for the prompt.
1584 is_public (Optional[bool]): New public status for the prompt.
1585 is_archived (Optional[bool]): New archived status for the prompt.
1587 Returns:
1588 Dict[str, Any]: The updated prompt data as returned by the server.
1590 Raises:
1591 ValueError: If the prompt_identifier is empty.
1592 HTTPError: If the server request fails.
1593 """
1594 settings = await self._get_settings()
1595 if is_public and not settings.tenant_handle:
1596 raise ValueError(
1597 "Cannot create a public prompt without first\n"
1598 "creating a LangChain Hub handle. "
1599 "You can add a handle by creating a public prompt at:\n"
1600 "https://smith.langchain.com/prompts"
1601 )
1603 json: dict[str, Union[str, bool, Sequence[str]]] = {}
1605 if description is not None:
1606 json["description"] = description
1607 if readme is not None:
1608 json["readme"] = readme
1609 if is_public is not None:
1610 json["is_public"] = is_public
1611 if is_archived is not None:
1612 json["is_archived"] = is_archived
1613 if tags is not None:
1614 json["tags"] = tags
1616 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier)
1617 response = await self._arequest_with_retries(
1618 "PATCH", f"/repos/{owner}/{prompt_name}", json=json
1619 )
1620 response.raise_for_status()
1621 return response.json()
1623 async def delete_prompt(self, prompt_identifier: str) -> None:
1624 """Delete a prompt.
1626 Args:
1627 prompt_identifier (str): The identifier of the prompt to delete.
1629 Returns:
1630 bool: True if the prompt was successfully deleted, False otherwise.
1632 Raises:
1633 ValueError: If the current tenant is not the owner of the prompt.
1634 """
1635 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier)
1636 if not (await self._current_tenant_is_owner(owner)):
1637 raise (await self._owner_conflict_error("delete a prompt", owner))
1639 response = await self._arequest_with_retries(
1640 "DELETE", f"/repos/{owner}/{prompt_name}"
1641 )
1642 response.raise_for_status()
1644 async def pull_prompt_commit(
1645 self,
1646 prompt_identifier: str,
1647 *,
1648 include_model: Optional[bool] = False,
1649 ) -> ls_schemas.PromptCommit:
1650 """Pull a prompt object from the LangSmith API.
1652 Args:
1653 prompt_identifier (str): The identifier of the prompt.
1655 Returns:
1656 PromptCommit: The prompt object.
1658 Raises:
1659 ValueError: If no commits are found for the prompt.
1660 """
1661 owner, prompt_name, commit_hash = ls_utils.parse_prompt_identifier(
1662 prompt_identifier
1663 )
1664 response = await self._arequest_with_retries(
1665 "GET",
1666 (
1667 f"/commits/{owner}/{prompt_name}/{commit_hash}"
1668 f"{'?include_model=true' if include_model else ''}"
1669 ),
1670 )
1671 return ls_schemas.PromptCommit(
1672 **{"owner": owner, "repo": prompt_name, **response.json()}
1673 )
1675 async def list_prompt_commits(
1676 self,
1677 prompt_identifier: str,
1678 *,
1679 limit: Optional[int] = None,
1680 offset: int = 0,
1681 include_model: bool = False,
1682 ) -> AsyncGenerator[ls_schemas.ListedPromptCommit, None]:
1683 """List commits for a given prompt.
1685 Args:
1686 prompt_identifier (str): The identifier of the prompt in the format 'owner/repo_name'.
1687 limit (Optional[int]): The maximum number of commits to return. If None, returns all commits. Defaults to None.
1688 offset (int, default=0): The number of commits to skip before starting to return results. Defaults to 0.
1689 include_model (bool, default=False): Whether to include the model information in the commit data. Defaults to False.
1691 Yields:
1692 A ListedPromptCommit object for each commit.
1694 !!! note
1696 This method uses pagination to retrieve commits. It will make multiple API calls if necessary to retrieve all commits
1697 or up to the specified limit.
1698 """
1699 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier)
1701 params = {
1702 "limit": min(100, limit) if limit is not None else limit,
1703 "offset": offset,
1704 "include_model": include_model,
1705 }
1706 i = 0
1707 while True:
1708 params["offset"] = offset
1709 response = await self._arequest_with_retries(
1710 "GET",
1711 f"/commits/{owner}/{prompt_name}/",
1712 params=params,
1713 )
1714 val = response.json()
1715 items = val["commits"]
1716 total = val["total"]
1718 if not items:
1719 break
1720 for it in items:
1721 if limit is not None and i >= limit:
1722 return # Stop iteration if we've reached the limit
1723 yield ls_schemas.ListedPromptCommit(
1724 **{"owner": owner, "repo": prompt_name, **it}
1725 )
1726 i += 1
1728 offset += len(items)
1729 if offset >= total:
1730 break
1732 async def pull_prompt(
1733 self, prompt_identifier: str, *, include_model: Optional[bool] = False
1734 ) -> Any:
1735 """Pull a prompt and return it as a LangChain `PromptTemplate`.
1737 This method requires [`langchain-core`](https://pypi.org/project/langchain-core).
1739 Args:
1740 prompt_identifier: The identifier of the prompt.
1741 include_model: Whether to include the model information in the prompt data.
1743 Returns:
1744 Any: The prompt object in the specified format.
1745 """
1746 try:
1747 from langchain_core.language_models.base import BaseLanguageModel
1748 from langchain_core.load.load import loads
1749 from langchain_core.output_parsers import BaseOutputParser
1750 from langchain_core.prompts import BasePromptTemplate
1751 from langchain_core.prompts.structured import StructuredPrompt
1752 from langchain_core.runnables.base import RunnableBinding, RunnableSequence
1753 except ImportError:
1754 raise ImportError(
1755 "The client.pull_prompt function requires the langchain-core"
1756 "package to run.\nInstall with `pip install langchain-core`"
1757 )
1758 try:
1759 from langchain_core._api import suppress_langchain_beta_warning
1760 except ImportError:
1762 @contextlib.contextmanager
1763 def suppress_langchain_beta_warning():
1764 yield
1766 prompt_object = await self.pull_prompt_commit(
1767 prompt_identifier, include_model=include_model
1768 )
1769 with suppress_langchain_beta_warning():
1770 prompt = loads(json.dumps(prompt_object.manifest))
1772 if (
1773 isinstance(prompt, BasePromptTemplate)
1774 or isinstance(prompt, RunnableSequence)
1775 and isinstance(prompt.first, BasePromptTemplate)
1776 ):
1777 prompt_template = (
1778 prompt
1779 if isinstance(prompt, BasePromptTemplate)
1780 else (
1781 prompt.first
1782 if isinstance(prompt, RunnableSequence)
1783 and isinstance(prompt.first, BasePromptTemplate)
1784 else None
1785 )
1786 )
1787 if prompt_template is None:
1788 raise ls_utils.LangSmithError(
1789 "Prompt object is not a valid prompt template."
1790 )
1792 if prompt_template.metadata is None:
1793 prompt_template.metadata = {}
1794 prompt_template.metadata.update(
1795 {
1796 "lc_hub_owner": prompt_object.owner,
1797 "lc_hub_repo": prompt_object.repo,
1798 "lc_hub_commit_hash": prompt_object.commit_hash,
1799 }
1800 )
1802 # Transform 2-step RunnableSequence to 3-step for structured prompts
1803 # See create_commit for the reverse transformation when pushing a prompt
1804 if (
1805 include_model
1806 and isinstance(prompt, RunnableSequence)
1807 and isinstance(prompt.first, StructuredPrompt)
1808 # Make forward-compatible in case we let update the response type
1809 and (
1810 len(prompt.steps) == 2 and not isinstance(prompt.last, BaseOutputParser)
1811 )
1812 ):
1813 if isinstance(prompt.last, RunnableBinding) and isinstance(
1814 prompt.last.bound, BaseLanguageModel
1815 ):
1816 seq = cast(RunnableSequence, prompt.first | prompt.last.bound)
1817 if len(seq.steps) == 3: # prompt | bound llm | output parser
1818 rebound_llm = seq.steps[1]
1819 prompt = RunnableSequence(
1820 prompt.first,
1821 rebound_llm.bind(**{**prompt.last.kwargs}),
1822 seq.last,
1823 )
1824 else:
1825 prompt = seq # Not sure
1827 elif isinstance(prompt.last, BaseLanguageModel):
1828 prompt: RunnableSequence = prompt.first | prompt.last # type: ignore[no-redef, assignment]
1829 else:
1830 pass
1832 return prompt
1834 async def push_prompt(
1835 self,
1836 prompt_identifier: str,
1837 *,
1838 object: Optional[Any] = None,
1839 parent_commit_hash: str = "latest",
1840 is_public: Optional[bool] = None,
1841 description: Optional[str] = None,
1842 readme: Optional[str] = None,
1843 tags: Optional[Sequence[str]] = None,
1844 ) -> str:
1845 """Push a prompt to the LangSmith API.
1847 Can be used to update prompt metadata or prompt content.
1849 If the prompt does not exist, it will be created.
1850 If the prompt exists, it will be updated.
1852 Args:
1853 prompt_identifier (str): The identifier of the prompt.
1854 object (Optional[Any]): The LangChain object to push.
1855 parent_commit_hash (str): The parent commit hash.
1856 Defaults to "latest".
1857 is_public (Optional[bool]): Whether the prompt should be public.
1858 If None (default), the current visibility status is maintained for existing prompts.
1859 For new prompts, None defaults to private.
1860 Set to True to make public, or False to make private.
1861 description (Optional[str]): A description of the prompt.
1862 Defaults to an empty string.
1863 readme (Optional[str]): A readme for the prompt.
1864 Defaults to an empty string.
1865 tags (Optional[Sequence[str]]): A list of tags for the prompt.
1866 Defaults to an empty list.
1868 Returns:
1869 str: The URL of the prompt.
1870 """
1871 # Create or update prompt metadata
1872 if await self._prompt_exists(prompt_identifier):
1873 if any(
1874 param is not None for param in [is_public, description, readme, tags]
1875 ):
1876 await self.update_prompt(
1877 prompt_identifier,
1878 description=description,
1879 readme=readme,
1880 tags=tags,
1881 is_public=is_public,
1882 )
1883 else:
1884 await self.create_prompt(
1885 prompt_identifier,
1886 is_public=is_public if is_public is not None else False,
1887 description=description,
1888 readme=readme,
1889 tags=tags,
1890 )
1892 if object is None:
1893 return await self._get_prompt_url(prompt_identifier=prompt_identifier)
1895 # Create a commit with the new manifest
1896 url = await self.create_commit(
1897 prompt_identifier,
1898 object,
1899 parent_commit_hash=parent_commit_hash,
1900 )
1901 return url
1904def _exclude_none(d: dict) -> dict:
1905 """Exclude None values from a dictionary."""
1906 return {k: v for k, v in d.items() if v is not None}