Coverage for langsmith/async_client.py: 17%

487 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""The Async LangSmith Client.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import datetime 

8import json 

9import uuid 

10import warnings 

11from collections.abc import AsyncGenerator, AsyncIterator, Mapping, Sequence 

12from typing import ( 

13 Any, 

14 Literal, 

15 Optional, 

16 Union, 

17 cast, 

18) 

19 

20import httpx 

21 

22from langsmith import client as ls_client 

23from langsmith import schemas as ls_schemas 

24from langsmith import utils as ls_utils 

25from langsmith._internal import _beta_decorator as ls_beta 

26 

27ID_TYPE = Union[uuid.UUID, str] 

28 

29 

30class AsyncClient: 

31 """Async Client for interacting with the LangSmith API.""" 

32 

33 __slots__ = ("_retry_config", "_client", "_web_url", "_settings") 

34 

35 def __init__( 

36 self, 

37 api_url: Optional[str] = None, 

38 api_key: Optional[str] = None, 

39 timeout_ms: Optional[ 

40 Union[ 

41 int, tuple[Optional[int], Optional[int], Optional[int], Optional[int]] 

42 ] 

43 ] = None, 

44 retry_config: Optional[Mapping[str, Any]] = None, 

45 web_url: Optional[str] = None, 

46 ): 

47 """Initialize the async client.""" 

48 self._retry_config = retry_config or {"max_retries": 3} 

49 _headers = { 

50 "Content-Type": "application/json", 

51 } 

52 api_key = ls_utils.get_api_key(api_key) 

53 api_url = ls_utils.get_api_url(api_url) 

54 if api_key: 

55 _headers[ls_client.X_API_KEY] = api_key 

56 ls_client._validate_api_key_if_hosted(api_url, api_key) 

57 

58 if isinstance(timeout_ms, int): 

59 timeout_: Union[tuple, float] = (timeout_ms / 1000, None, None, None) 

60 elif isinstance(timeout_ms, tuple): 

61 timeout_ = tuple([t / 1000 if t is not None else None for t in timeout_ms]) 

62 else: 

63 timeout_ = 10 

64 self._client = httpx.AsyncClient( 

65 base_url=api_url, headers=_headers, timeout=timeout_ 

66 ) 

67 self._web_url = web_url 

68 self._settings: Optional[ls_schemas.LangSmithSettings] = None 

69 

70 async def __aenter__(self) -> AsyncClient: 

71 """Enter the async client.""" 

72 return self 

73 

74 async def __aexit__(self, exc_type, exc_val, exc_tb): 

75 """Exit the async client.""" 

76 await self.aclose() 

77 

78 async def aclose(self): 

79 """Close the async client.""" 

80 await self._client.aclose() 

81 

82 @property 

83 def _api_url(self): 

84 return str(self._client.base_url) 

85 

86 @property 

87 def _host_url(self) -> str: 

88 """The web host url.""" 

89 return ls_utils.get_host_url(self._web_url, self._api_url) 

90 

91 async def _arequest_with_retries( 

92 self, 

93 method: str, 

94 endpoint: str, 

95 **kwargs: Any, 

96 ) -> httpx.Response: 

97 """Make an async HTTP request with retries.""" 

98 max_retries = cast(int, self._retry_config.get("max_retries", 3)) 

99 

100 # Python requests library used by the normal Client filters out params with None values 

101 # The httpx library does not. Filter them out here to keep behavior consistent 

102 if "params" in kwargs: 

103 params = kwargs["params"] 

104 filtered_params = {k: v for k, v in params.items() if v is not None} 

105 kwargs["params"] = filtered_params 

106 

107 for attempt in range(max_retries): 

108 try: 

109 response = await self._client.request(method, endpoint, **kwargs) 

110 ls_utils.raise_for_status_with_text(response) 

111 return response 

112 except httpx.HTTPStatusError as e: 

113 if response.status_code == 500: 

114 raise ls_utils.LangSmithAPIError( 

115 f"Server error caused failure to {method}" 

116 f" {endpoint} in" 

117 f" LangSmith API. {repr(e)}" 

118 ) 

119 elif response.status_code == 408: 

120 raise ls_utils.LangSmithRequestTimeout( 

121 f"Client took too long to send request to {method}{endpoint}" 

122 ) 

123 elif response.status_code == 429: 

124 raise ls_utils.LangSmithRateLimitError( 

125 f"Rate limit exceeded for {endpoint}. {repr(e)}" 

126 ) 

127 elif response.status_code == 401: 

128 raise ls_utils.LangSmithAuthError( 

129 f"Authentication failed for {endpoint}. {repr(e)}" 

130 ) 

131 elif response.status_code == 404: 

132 raise ls_utils.LangSmithNotFoundError( 

133 f"Resource not found for {endpoint}. {repr(e)}" 

134 ) 

135 elif response.status_code == 409: 

136 raise ls_utils.LangSmithConflictError( 

137 f"Conflict for {endpoint}. {repr(e)}" 

138 ) 

139 else: 

140 raise ls_utils.LangSmithError( 

141 f"Failed to {method} {endpoint} in LangSmith API. {repr(e)}" 

142 ) 

143 except httpx.RequestError as e: 

144 if attempt == max_retries - 1: 

145 raise ls_utils.LangSmithConnectionError(f"Request error: {repr(e)}") 

146 await asyncio.sleep(2**attempt) 

147 raise ls_utils.LangSmithAPIError( 

148 "Unexpected error connecting to the LangSmith API" 

149 ) 

150 

151 async def _aget_paginated_list( 

152 self, 

153 path: str, 

154 params: Optional[dict[str, Any]] = None, 

155 ) -> AsyncIterator[dict[str, Any]]: 

156 """Get a paginated list of items.""" 

157 params = params or {} 

158 offset = params.get("offset", 0) 

159 params["limit"] = params.get("limit", 100) 

160 while True: 

161 params["offset"] = offset 

162 response = await self._arequest_with_retries("GET", path, params=params) 

163 items = response.json() 

164 if not items: 

165 break 

166 for item in items: 

167 yield item 

168 if len(items) < params["limit"]: 

169 break 

170 offset += len(items) 

171 

172 async def _aget_cursor_paginated_list( 

173 self, 

174 path: str, 

175 *, 

176 body: Optional[dict] = None, 

177 request_method: str = "POST", 

178 data_key: str = "runs", 

179 ) -> AsyncIterator[dict]: 

180 """Get a cursor paginated list of items.""" 

181 params_ = body.copy() if body else {} 

182 while True: 

183 response = await self._arequest_with_retries( 

184 request_method, 

185 path, 

186 content=ls_client._dumps_json(params_), 

187 ) 

188 response_body = response.json() 

189 if not response_body: 

190 break 

191 if not response_body.get(data_key): 

192 break 

193 for run in response_body[data_key]: 

194 yield run 

195 cursors = response_body.get("cursors") 

196 if not cursors: 

197 break 

198 if not cursors.get("next"): 

199 break 

200 params_["cursor"] = cursors["next"] 

201 

202 async def create_run( 

203 self, 

204 name: str, 

205 inputs: dict[str, Any], 

206 run_type: str, 

207 *, 

208 project_name: Optional[str] = None, 

209 revision_id: Optional[ls_client.ID_TYPE] = None, 

210 **kwargs: Any, 

211 ) -> None: 

212 """Create a run.""" 

213 run_create = { 

214 "name": name, 

215 "id": kwargs.get("id") or uuid.uuid4(), 

216 "inputs": inputs, 

217 "run_type": run_type, 

218 "session_name": project_name or ls_utils.get_tracer_project(), 

219 "revision_id": revision_id, 

220 **kwargs, 

221 } 

222 await self._arequest_with_retries( 

223 "POST", "/runs", content=ls_client._dumps_json(run_create) 

224 ) 

225 

226 async def update_run( 

227 self, 

228 run_id: ls_client.ID_TYPE, 

229 **kwargs: Any, 

230 ) -> None: 

231 """Update a run.""" 

232 data = {**kwargs, "id": ls_client._as_uuid(run_id)} 

233 await self._arequest_with_retries( 

234 "PATCH", 

235 f"/runs/{ls_client._as_uuid(run_id)}", 

236 content=ls_client._dumps_json(data), 

237 ) 

238 

239 async def read_run(self, run_id: ls_client.ID_TYPE) -> ls_schemas.Run: 

240 """Read a run.""" 

241 response = await self._arequest_with_retries( 

242 "GET", 

243 f"/runs/{ls_client._as_uuid(run_id)}", 

244 ) 

245 return ls_schemas.Run(**response.json()) 

246 

247 async def list_runs( 

248 self, 

249 *, 

250 project_id: Optional[ 

251 Union[ls_client.ID_TYPE, Sequence[ls_client.ID_TYPE]] 

252 ] = None, 

253 project_name: Optional[Union[str, Sequence[str]]] = None, 

254 run_type: Optional[str] = None, 

255 trace_id: Optional[ls_client.ID_TYPE] = None, 

256 reference_example_id: Optional[ls_client.ID_TYPE] = None, 

257 query: Optional[str] = None, 

258 filter: Optional[str] = None, 

259 trace_filter: Optional[str] = None, 

260 tree_filter: Optional[str] = None, 

261 is_root: Optional[bool] = None, 

262 parent_run_id: Optional[ls_client.ID_TYPE] = None, 

263 start_time: Optional[datetime.datetime] = None, 

264 error: Optional[bool] = None, 

265 run_ids: Optional[Sequence[ls_client.ID_TYPE]] = None, 

266 select: Optional[Sequence[str]] = None, 

267 limit: Optional[int] = None, 

268 **kwargs: Any, 

269 ) -> AsyncIterator[ls_schemas.Run]: 

270 """List runs from the LangSmith API. 

271 

272 Parameters 

273 ---------- 

274 project_id : UUID or None, default=None 

275 The ID(s) of the project to filter by. 

276 project_name : str or None, default=None 

277 The name(s) of the project to filter by. 

278 run_type : str or None, default=None 

279 The type of the runs to filter by. 

280 trace_id : UUID or None, default=None 

281 The ID of the trace to filter by. 

282 reference_example_id : UUID or None, default=None 

283 The ID of the reference example to filter by. 

284 query : str or None, default=None 

285 The query string to filter by. 

286 filter : str or None, default=None 

287 The filter string to filter by. 

288 trace_filter : str or None, default=None 

289 Filter to apply to the ROOT run in the trace tree. This is meant to 

290 be used in conjunction with the regular `filter` parameter to let you 

291 filter runs by attributes of the root run within a trace. 

292 tree_filter : str or None, default=None 

293 Filter to apply to OTHER runs in the trace tree, including 

294 sibling and child runs. This is meant to be used in conjunction with 

295 the regular `filter` parameter to let you filter runs by attributes 

296 of any run within a trace. 

297 is_root : bool or None, default=None 

298 Whether to filter by root runs. 

299 parent_run_id : UUID or None, default=None 

300 The ID of the parent run to filter by. 

301 start_time : datetime or None, default=None 

302 The start time to filter by. 

303 error : bool or None, default=None 

304 Whether to filter by error status. 

305 run_ids : List[str or UUID] or None, default=None 

306 The IDs of the runs to filter by. 

307 limit : int or None, default=None 

308 The maximum number of runs to return. 

309 **kwargs : Any 

310 Additional keyword arguments. 

311 

312 Yields: 

313 ------ 

314 Run 

315 The runs. 

316 

317 Examples: 

318 -------- 

319 List all runs in a project: 

320 

321 ```python 

322 project_runs = client.list_runs(project_name="<your_project>") 

323 ``` 

324 

325 List LLM and Chat runs in the last 24 hours: 

326 

327 ```python 

328 todays_llm_runs = client.list_runs( 

329 project_name="<your_project>", 

330 start_time=datetime.now() - timedelta(days=1), 

331 run_type="llm", 

332 ) 

333 ``` 

334 

335 List root traces in a project: 

336 

337 ```python 

338 root_runs = client.list_runs(project_name="<your_project>", is_root=1) 

339 ``` 

340 

341 List runs without errors: 

342 

343 ```python 

344 correct_runs = client.list_runs(project_name="<your_project>", error=False) 

345 ``` 

346 

347 List runs and only return their inputs/outputs (to speed up the query): 

348 

349 ```python 

350 input_output_runs = client.list_runs( 

351 project_name="<your_project>", select=["inputs", "outputs"] 

352 ) 

353 ``` 

354 

355 List runs by run ID: 

356 

357 ```python 

358 run_ids = [ 

359 "a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836", 

360 "9398e6be-964f-4aa4-8ae9-ad78cd4b7074", 

361 ] 

362 selected_runs = client.list_runs(id=run_ids) 

363 ``` 

364 

365 List all "chain" type runs that took more than 10 seconds and had 

366 `total_tokens` greater than 5000: 

367 

368 ```python 

369 chain_runs = client.list_runs( 

370 project_name="<your_project>", 

371 filter='and(eq(run_type, "chain"), gt(latency, 10), gt(total_tokens, 5000))', 

372 ) 

373 ``` 

374 

375 List all runs called "extractor" whose root of the trace was assigned feedback "user_score" score of 1: 

376 

377 ```python 

378 good_extractor_runs = client.list_runs( 

379 project_name="<your_project>", 

380 filter='eq(name, "extractor")', 

381 trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))', 

382 ) 

383 ``` 

384 

385 List all runs that started after a specific timestamp and either have "error" not equal to null or a "Correctness" feedback score equal to 0: 

386 

387 ```python 

388 complex_runs = client.list_runs( 

389 project_name="<your_project>", 

390 filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))', 

391 ) 

392 ``` 

393 

394 List all runs where `tags` include "experimental" or "beta" and `latency` is greater than 2 seconds: 

395 

396 ```python 

397 tagged_runs = client.list_runs( 

398 project_name="<your_project>", 

399 filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))', 

400 ) 

401 ``` 

402 """ 

403 project_ids = [] 

404 if isinstance(project_id, (uuid.UUID, str)): 

405 project_ids.append(project_id) 

406 elif isinstance(project_id, list): 

407 project_ids.extend(project_id) 

408 if project_name is not None: 

409 if isinstance(project_name, str): 

410 project_name = [project_name] 

411 projects = await asyncio.gather( 

412 *[self.read_project(project_name=name) for name in project_name] 

413 ) 

414 project_ids.extend([project.id for project in projects]) 

415 

416 if select and "child_run_ids" in select: 

417 warnings.warn( 

418 "The child_run_ids field is deprecated and will be removed in following versions", 

419 DeprecationWarning, 

420 ) 

421 

422 body_query: dict[str, Any] = { 

423 "session": project_ids if project_ids else None, 

424 "run_type": run_type, 

425 "reference_example": ( 

426 [reference_example_id] if reference_example_id else None 

427 ), 

428 "query": query, 

429 "filter": filter, 

430 "trace_filter": trace_filter, 

431 "tree_filter": tree_filter, 

432 "is_root": is_root, 

433 "parent_run": parent_run_id, 

434 "start_time": start_time.isoformat() if start_time else None, 

435 "error": error, 

436 "id": run_ids, 

437 "trace": trace_id, 

438 "select": select, 

439 "limit": limit, 

440 **kwargs, 

441 } 

442 if project_ids: 

443 body_query["session"] = [ 

444 str(ls_client._as_uuid(id_)) for id_ in project_ids 

445 ] 

446 body = {k: v for k, v in body_query.items() if v is not None} 

447 ix = 0 

448 async for run in self._aget_cursor_paginated_list("/runs/query", body=body): 

449 yield ls_schemas.Run(**run) 

450 ix += 1 

451 if limit is not None and ix >= limit: 

452 break 

453 

454 async def share_run( 

455 self, run_id: ls_client.ID_TYPE, *, share_id: Optional[ls_client.ID_TYPE] = None 

456 ) -> str: 

457 """Get a share link for a run asynchronously. 

458 

459 Args: 

460 run_id (ID_TYPE): The ID of the run to share. 

461 share_id (Optional[ID_TYPE], optional): Custom share ID. 

462 If not provided, a random UUID will be generated. 

463 

464 Returns: 

465 str: The URL of the shared run. 

466 

467 Raises: 

468 httpx.HTTPStatusError: If the API request fails. 

469 """ 

470 run_id_ = ls_client._as_uuid(run_id, "run_id") 

471 data = { 

472 "run_id": str(run_id_), 

473 "share_token": str(share_id or uuid.uuid4()), 

474 } 

475 response = await self._arequest_with_retries( 

476 "PUT", 

477 f"/runs/{run_id_}/share", 

478 content=ls_client._dumps_json(data), 

479 ) 

480 ls_utils.raise_for_status_with_text(response) 

481 share_token = response.json()["share_token"] 

482 return f"{self._host_url}/public/{share_token}/r" 

483 

484 async def run_is_shared(self, run_id: ls_client.ID_TYPE) -> bool: 

485 """Get share state for a run asynchronously.""" 

486 link = await self.read_run_shared_link(ls_client._as_uuid(run_id, "run_id")) 

487 return link is not None 

488 

489 async def read_run_shared_link(self, run_id: ls_client.ID_TYPE) -> Optional[str]: 

490 """Retrieve the shared link for a specific run asynchronously. 

491 

492 Args: 

493 run_id (ID_TYPE): The ID of the run. 

494 

495 Returns: 

496 Optional[str]: The shared link for the run, or None if the link is not 

497 available. 

498 

499 Raises: 

500 httpx.HTTPStatusError: If the API request fails. 

501 """ 

502 response = await self._arequest_with_retries( 

503 "GET", 

504 f"/runs/{ls_client._as_uuid(run_id, 'run_id')}/share", 

505 ) 

506 ls_utils.raise_for_status_with_text(response) 

507 result = response.json() 

508 if result is None or "share_token" not in result: 

509 return None 

510 return f"{self._host_url}/public/{result['share_token']}/r" 

511 

512 async def create_project( 

513 self, 

514 project_name: str, 

515 **kwargs: Any, 

516 ) -> ls_schemas.TracerSession: 

517 """Create a project.""" 

518 data = {"name": project_name, **kwargs} 

519 response = await self._arequest_with_retries( 

520 "POST", "/sessions", content=ls_client._dumps_json(data) 

521 ) 

522 return ls_schemas.TracerSession(**response.json()) 

523 

524 async def read_project( 

525 self, 

526 project_name: Optional[str] = None, 

527 project_id: Optional[ls_client.ID_TYPE] = None, 

528 ) -> ls_schemas.TracerSession: 

529 """Read a project.""" 

530 if project_id: 

531 response = await self._arequest_with_retries( 

532 "GET", f"/sessions/{ls_client._as_uuid(project_id)}" 

533 ) 

534 elif project_name: 

535 response = await self._arequest_with_retries( 

536 "GET", "/sessions", params={"name": project_name} 

537 ) 

538 else: 

539 raise ValueError("Either project_name or project_id must be provided") 

540 

541 data = response.json() 

542 if isinstance(data, list): 

543 if not data: 

544 raise ls_utils.LangSmithNotFoundError( 

545 f"Project {project_name} not found" 

546 ) 

547 return ls_schemas.TracerSession(**data[0]) 

548 return ls_schemas.TracerSession(**data) 

549 

550 async def delete_project( 

551 self, *, project_name: Optional[str] = None, project_id: Optional[str] = None 

552 ) -> None: 

553 """Delete a project from LangSmith. 

554 

555 Parameters 

556 ---------- 

557 project_name : str or None, default=None 

558 The name of the project to delete. 

559 project_id : str or None, default=None 

560 The ID of the project to delete. 

561 """ 

562 if project_id is None and project_name is None: 

563 raise ValueError("Either project_name or project_id must be provided") 

564 if project_id is None: 

565 project = await self.read_project(project_name=project_name) 

566 project_id = str(project.id) 

567 if not project_id: 

568 raise ValueError("Project not found") 

569 await self._arequest_with_retries( 

570 "DELETE", 

571 f"/sessions/{ls_client._as_uuid(project_id)}", 

572 ) 

573 

574 async def create_dataset( 

575 self, 

576 dataset_name: str, 

577 **kwargs: Any, 

578 ) -> ls_schemas.Dataset: 

579 """Create a dataset.""" 

580 data = {"name": dataset_name, **kwargs} 

581 response = await self._arequest_with_retries( 

582 "POST", "/datasets", content=ls_client._dumps_json(data) 

583 ) 

584 return ls_schemas.Dataset(**response.json()) 

585 

586 async def read_dataset( 

587 self, 

588 dataset_name: Optional[str] = None, 

589 dataset_id: Optional[ls_client.ID_TYPE] = None, 

590 ) -> ls_schemas.Dataset: 

591 """Read a dataset.""" 

592 if dataset_id: 

593 response = await self._arequest_with_retries( 

594 "GET", f"/datasets/{ls_client._as_uuid(dataset_id)}" 

595 ) 

596 elif dataset_name: 

597 response = await self._arequest_with_retries( 

598 "GET", "/datasets", params={"name": dataset_name} 

599 ) 

600 else: 

601 raise ValueError("Either dataset_name or dataset_id must be provided") 

602 

603 data = response.json() 

604 if isinstance(data, list): 

605 if not data: 

606 raise ls_utils.LangSmithNotFoundError( 

607 f"Dataset {dataset_name} not found" 

608 ) 

609 return ls_schemas.Dataset(**data[0]) 

610 return ls_schemas.Dataset(**data) 

611 

612 async def delete_dataset(self, dataset_id: ls_client.ID_TYPE) -> None: 

613 """Delete a dataset.""" 

614 await self._arequest_with_retries( 

615 "DELETE", 

616 f"/datasets/{ls_client._as_uuid(dataset_id)}", 

617 ) 

618 

619 async def list_datasets( 

620 self, 

621 **kwargs: Any, 

622 ) -> AsyncIterator[ls_schemas.Dataset]: 

623 """List datasets.""" 

624 async for dataset in self._aget_paginated_list("/datasets", params=kwargs): 

625 yield ls_schemas.Dataset(**dataset) 

626 

627 async def create_example( 

628 self, 

629 inputs: dict[str, Any], 

630 outputs: Optional[dict[str, Any]] = None, 

631 dataset_id: Optional[ls_client.ID_TYPE] = None, 

632 dataset_name: Optional[str] = None, 

633 **kwargs: Any, 

634 ) -> ls_schemas.Example: 

635 """Create an example.""" 

636 if dataset_id is None and dataset_name is None: 

637 raise ValueError("Either dataset_id or dataset_name must be provided") 

638 if dataset_id is None: 

639 dataset = await self.read_dataset(dataset_name=dataset_name) 

640 dataset_id = dataset.id 

641 

642 data = { 

643 "inputs": inputs, 

644 "outputs": outputs, 

645 "dataset_id": str(dataset_id), 

646 **kwargs, 

647 } 

648 response = await self._arequest_with_retries( 

649 "POST", "/examples", content=ls_client._dumps_json(data) 

650 ) 

651 return ls_schemas.Example(**response.json()) 

652 

653 async def read_example(self, example_id: ls_client.ID_TYPE) -> ls_schemas.Example: 

654 """Read an example.""" 

655 response = await self._arequest_with_retries( 

656 "GET", f"/examples/{ls_client._as_uuid(example_id)}" 

657 ) 

658 return ls_schemas.Example(**response.json()) 

659 

660 async def list_examples( 

661 self, 

662 *, 

663 dataset_id: Optional[ls_client.ID_TYPE] = None, 

664 dataset_name: Optional[str] = None, 

665 **kwargs: Any, 

666 ) -> AsyncIterator[ls_schemas.Example]: 

667 """List examples.""" 

668 params = kwargs.copy() 

669 if dataset_id: 

670 params["dataset"] = ls_client._as_uuid(dataset_id) 

671 elif dataset_name: 

672 dataset = await self.read_dataset(dataset_name=dataset_name) 

673 params["dataset"] = dataset.id 

674 

675 async for example in self._aget_paginated_list("/examples", params=params): 

676 yield ls_schemas.Example(**example) 

677 

678 async def create_feedback( 

679 self, 

680 run_id: Optional[ls_client.ID_TYPE], 

681 key: str, 

682 score: Optional[float] = None, 

683 value: Optional[Any] = None, 

684 comment: Optional[str] = None, 

685 **kwargs: Any, 

686 ) -> ls_schemas.Feedback: 

687 """Create feedback for a run. 

688 

689 Args: 

690 run_id (Optional[ls_client.ID_TYPE]): The ID of the run to provide feedback for. 

691 Can be None for project-level feedback. 

692 key (str): The name of the metric or aspect this feedback is about. 

693 score (Optional[float]): The score to rate this run on the metric or aspect. 

694 value (Optional[Any]): The display value or non-numeric value for this feedback. 

695 comment (Optional[str]): A comment about this feedback. 

696 **kwargs: Additional keyword arguments to include in the feedback data. 

697 

698 Returns: 

699 ls_schemas.Feedback: The created feedback object. 

700 

701 Raises: 

702 httpx.HTTPStatusError: If the API request fails. 

703 """ # noqa: E501 

704 data = { 

705 "run_id": ls_client._ensure_uuid(run_id, accept_null=True), 

706 "key": key, 

707 "score": score, 

708 "value": value, 

709 "comment": comment, 

710 **kwargs, 

711 } 

712 response = await self._arequest_with_retries( 

713 "POST", "/feedback", content=ls_client._dumps_json(data) 

714 ) 

715 return ls_schemas.Feedback(**response.json()) 

716 

717 async def create_feedback_from_token( 

718 self, 

719 token_or_url: Union[str, uuid.UUID], 

720 score: Union[float, int, bool, None] = None, 

721 *, 

722 value: Union[float, int, bool, str, dict, None] = None, 

723 correction: Union[dict, None] = None, 

724 comment: Union[str, None] = None, 

725 metadata: Optional[dict] = None, 

726 ) -> None: 

727 """Create feedback from a presigned token or URL. 

728 

729 Args: 

730 token_or_url (Union[str, uuid.UUID]): The token or URL from which to create 

731 feedback. 

732 score (Union[float, int, bool, None], optional): The score of the feedback. 

733 Defaults to None. 

734 value (Union[float, int, bool, str, dict, None], optional): The value of the 

735 feedback. Defaults to None. 

736 correction (Union[dict, None], optional): The correction of the feedback. 

737 Defaults to None. 

738 comment (Union[str, None], optional): The comment of the feedback. Defaults 

739 to None. 

740 metadata (Optional[dict], optional): Additional metadata for the feedback. 

741 Defaults to None. 

742 

743 Raises: 

744 ValueError: If the source API URL is invalid. 

745 

746 Returns: 

747 None: This method does not return anything. 

748 """ 

749 source_api_url, token_uuid = ls_client._parse_token_or_url( 

750 token_or_url, self._api_url, num_parts=1 

751 ) 

752 if source_api_url != self._api_url: 

753 raise ValueError(f"Invalid source API URL. {source_api_url}") 

754 response = await self._arequest_with_retries( 

755 "POST", 

756 f"/feedback/tokens/{ls_client._as_uuid(token_uuid)}", 

757 content=ls_client._dumps_json( 

758 { 

759 "score": score, 

760 "value": value, 

761 "correction": correction, 

762 "comment": comment, 

763 "metadata": metadata, 

764 # TODO: Add ID once the API supports it. 

765 } 

766 ), 

767 ) 

768 ls_utils.raise_for_status_with_text(response) 

769 

770 async def create_presigned_feedback_token( 

771 self, 

772 run_id: ls_client.ID_TYPE, 

773 feedback_key: str, 

774 *, 

775 expiration: Optional[datetime.datetime | datetime.timedelta] = None, 

776 feedback_config: Optional[ls_schemas.FeedbackConfig] = None, 

777 feedback_id: Optional[ls_client.ID_TYPE] = None, 

778 ) -> ls_schemas.FeedbackIngestToken: 

779 """Create a pre-signed URL to send feedback data to. 

780 

781 This is useful for giving browser-based clients a way to upload 

782 feedback data directly to LangSmith without accessing the 

783 API key. 

784 

785 Args: 

786 run_id: 

787 feedback_key: 

788 expiration: The expiration time of the pre-signed URL. 

789 Either a datetime or a timedelta offset from now. 

790 Default to 3 hours. 

791 feedback_config: FeedbackConfig or None. 

792 If creating a feedback_key for the first time, 

793 this defines how the metric should be interpreted, 

794 such as a continuous score (w/ optional bounds), 

795 or distribution over categorical values. 

796 feedback_id: The ID of the feedback to create. If not provided, a new 

797 feedback will be created. 

798 

799 Returns: 

800 The pre-signed URL for uploading feedback data. 

801 """ 

802 body: dict[str, Any] = { 

803 "run_id": run_id, 

804 "feedback_key": feedback_key, 

805 "feedback_config": feedback_config, 

806 "id": feedback_id or str(uuid.uuid4()), 

807 } 

808 if expiration is None: 

809 body["expires_in"] = ls_schemas.TimeDeltaInput( 

810 days=0, 

811 hours=3, 

812 minutes=0, 

813 ) 

814 elif isinstance(expiration, datetime.datetime): 

815 body["expires_at"] = expiration.isoformat() 

816 elif isinstance(expiration, datetime.timedelta): 

817 body["expires_in"] = ls_schemas.TimeDeltaInput( 

818 days=expiration.days, 

819 hours=expiration.seconds // 3600, 

820 minutes=(expiration.seconds % 3600) // 60, 

821 ) 

822 else: 

823 raise ValueError( 

824 f"Invalid expiration type: {type(expiration)}. " 

825 "Expected datetime.datetime or datetime.timedelta." 

826 ) 

827 

828 response = await self._arequest_with_retries( 

829 "POST", 

830 "/feedback/tokens", 

831 content=ls_client._dumps_json(body), 

832 ) 

833 return ls_schemas.FeedbackIngestToken(**response.json()) 

834 

835 async def read_feedback( 

836 self, feedback_id: ls_client.ID_TYPE 

837 ) -> ls_schemas.Feedback: 

838 """Read feedback.""" 

839 response = await self._arequest_with_retries( 

840 "GET", f"/feedback/{ls_client._as_uuid(feedback_id)}" 

841 ) 

842 return ls_schemas.Feedback(**response.json()) 

843 

844 async def list_feedback( 

845 self, 

846 *, 

847 run_ids: Optional[Sequence[ls_client.ID_TYPE]] = None, 

848 feedback_key: Optional[Sequence[str]] = None, 

849 feedback_source_type: Optional[Sequence[ls_schemas.FeedbackSourceType]] = None, 

850 limit: Optional[int] = None, 

851 **kwargs: Any, 

852 ) -> AsyncIterator[ls_schemas.Feedback]: 

853 """List feedback.""" 

854 params = { 

855 "run": ( 

856 [str(ls_client._as_uuid(id_)) for id_ in run_ids] if run_ids else None 

857 ), 

858 "limit": min(limit, 100) if limit is not None else 100, 

859 **kwargs, 

860 } 

861 if feedback_key is not None: 

862 params["key"] = feedback_key 

863 if feedback_source_type is not None: 

864 params["source"] = feedback_source_type 

865 ix = 0 

866 async for feedback in self._aget_paginated_list("/feedback", params=params): 

867 yield ls_schemas.Feedback(**feedback) 

868 ix += 1 

869 if limit is not None and ix >= limit: 

870 break 

871 

872 async def delete_feedback(self, feedback_id: ID_TYPE) -> None: 

873 """Delete a feedback by ID. 

874 

875 Args: 

876 feedback_id (Union[UUID, str]): 

877 The ID of the feedback to delete. 

878 

879 Returns: 

880 None 

881 """ 

882 response = await self._arequest_with_retries( 

883 "DELETE", f"/feedback/{ls_client._as_uuid(feedback_id, 'feedback_id')}" 

884 ) 

885 ls_utils.raise_for_status_with_text(response) 

886 

887 # Annotation Queue API 

888 

889 async def list_annotation_queues( 

890 self, 

891 *, 

892 queue_ids: Optional[list[ID_TYPE]] = None, 

893 name: Optional[str] = None, 

894 name_contains: Optional[str] = None, 

895 limit: Optional[int] = None, 

896 ) -> AsyncIterator[ls_schemas.AnnotationQueue]: 

897 """List the annotation queues on the LangSmith API. 

898 

899 Args: 

900 queue_ids (Optional[List[Union[UUID, str]]]): 

901 The IDs of the queues to filter by. 

902 name (Optional[str]): 

903 The name of the queue to filter by. 

904 name_contains (Optional[str]): 

905 The substring that the queue name should contain. 

906 limit (Optional[int]): 

907 The maximum number of queues to return. 

908 

909 Yields: 

910 The annotation queues. 

911 """ 

912 params: dict = { 

913 "ids": ( 

914 [ 

915 ls_client._as_uuid(id_, f"queue_ids[{i}]") 

916 for i, id_ in enumerate(queue_ids) 

917 ] 

918 if queue_ids is not None 

919 else None 

920 ), 

921 "name": name, 

922 "name_contains": name_contains, 

923 "limit": min(limit, 100) if limit is not None else 100, 

924 } 

925 ix = 0 

926 async for feedback in self._aget_paginated_list( 

927 "/annotation-queues", params=params 

928 ): 

929 yield ls_schemas.AnnotationQueue(**feedback) 

930 ix += 1 

931 if limit is not None and ix >= limit: 

932 break 

933 

934 async def create_annotation_queue( 

935 self, 

936 *, 

937 name: str, 

938 description: Optional[str] = None, 

939 queue_id: Optional[ID_TYPE] = None, 

940 ) -> ls_schemas.AnnotationQueue: 

941 """Create an annotation queue on the LangSmith API. 

942 

943 Args: 

944 name (str): 

945 The name of the annotation queue. 

946 description (Optional[str]): 

947 The description of the annotation queue. 

948 queue_id (Optional[Union[UUID, str]]): 

949 The ID of the annotation queue. 

950 

951 Returns: 

952 AnnotationQueue: The created annotation queue object. 

953 """ 

954 body = { 

955 "name": name, 

956 "description": description, 

957 "id": str(queue_id) if queue_id is not None else str(uuid.uuid4()), 

958 } 

959 response = await self._arequest_with_retries( 

960 "POST", 

961 "/annotation-queues", 

962 json={k: v for k, v in body.items() if v is not None}, 

963 ) 

964 ls_utils.raise_for_status_with_text(response) 

965 return ls_schemas.AnnotationQueue( 

966 **response.json(), 

967 ) 

968 

969 async def read_annotation_queue( 

970 self, queue_id: ID_TYPE 

971 ) -> ls_schemas.AnnotationQueue: 

972 """Read an annotation queue with the specified `queue_id`. 

973 

974 Args: 

975 queue_id (Union[UUID, str]): The ID of the annotation queue to read. 

976 

977 Returns: 

978 AnnotationQueue: The annotation queue object. 

979 """ 

980 # TODO: Replace when actual endpoint is added 

981 return await self.list_annotation_queues(queue_ids=[queue_id]).__anext__() 

982 

983 async def update_annotation_queue( 

984 self, queue_id: ID_TYPE, *, name: str, description: Optional[str] = None 

985 ) -> None: 

986 """Update an annotation queue with the specified `queue_id`. 

987 

988 Args: 

989 queue_id (Union[UUID, str]): The ID of the annotation queue to update. 

990 name (str): The new name for the annotation queue. 

991 description (Optional[str]): The new description for the 

992 annotation queue. Defaults to None. 

993 

994 Returns: 

995 None 

996 """ 

997 response = await self._arequest_with_retries( 

998 "PATCH", 

999 f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}", 

1000 json={ 

1001 "name": name, 

1002 "description": description, 

1003 }, 

1004 ) 

1005 ls_utils.raise_for_status_with_text(response) 

1006 

1007 async def delete_annotation_queue(self, queue_id: ID_TYPE) -> None: 

1008 """Delete an annotation queue with the specified `queue_id`. 

1009 

1010 Args: 

1011 queue_id (Union[UUID, str]): The ID of the annotation queue to delete. 

1012 

1013 Returns: 

1014 None 

1015 """ 

1016 response = await self._arequest_with_retries( 

1017 "DELETE", 

1018 f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}", 

1019 headers={"Accept": "application/json", **self._client.headers}, 

1020 ) 

1021 ls_utils.raise_for_status_with_text(response) 

1022 

1023 async def add_runs_to_annotation_queue( 

1024 self, queue_id: ID_TYPE, *, run_ids: list[ID_TYPE] 

1025 ) -> None: 

1026 """Add runs to an annotation queue with the specified `queue_id`. 

1027 

1028 Args: 

1029 queue_id (Union[UUID, str]): The ID of the annotation queue. 

1030 run_ids (List[Union[UUID, str]]): The IDs of the runs to be added to the annotation 

1031 queue. 

1032 

1033 Returns: 

1034 None 

1035 """ 

1036 response = await self._arequest_with_retries( 

1037 "POST", 

1038 f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/runs", 

1039 json=[ 

1040 str(ls_client._as_uuid(id_, f"run_ids[{i}]")) 

1041 for i, id_ in enumerate(run_ids) 

1042 ], 

1043 ) 

1044 ls_utils.raise_for_status_with_text(response) 

1045 

1046 async def delete_run_from_annotation_queue( 

1047 self, queue_id: ID_TYPE, *, run_id: ID_TYPE 

1048 ) -> None: 

1049 """Delete a run from an annotation queue with the specified `queue_id` and `run_id`. 

1050 

1051 Args: 

1052 queue_id (Union[UUID, str]): The ID of the annotation queue. 

1053 run_id (Union[UUID, str]): The ID of the run to be added to the annotation 

1054 queue. 

1055 

1056 Returns: 

1057 None 

1058 """ 

1059 response = await self._arequest_with_retries( 

1060 "DELETE", 

1061 f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/runs/{ls_client._as_uuid(run_id, 'run_id')}", 

1062 ) 

1063 ls_utils.raise_for_status_with_text(response) 

1064 

1065 async def get_run_from_annotation_queue( 

1066 self, queue_id: ID_TYPE, *, index: int 

1067 ) -> ls_schemas.RunWithAnnotationQueueInfo: 

1068 """Get a run from an annotation queue at the specified index. 

1069 

1070 Args: 

1071 queue_id (Union[UUID, str]): The ID of the annotation queue. 

1072 index (int): The index of the run to retrieve. 

1073 

1074 Returns: 

1075 RunWithAnnotationQueueInfo: The run at the specified index. 

1076 

1077 Raises: 

1078 LangSmithNotFoundError: If the run is not found at the given index. 

1079 LangSmithError: For other API-related errors. 

1080 """ 

1081 base_url = f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/run" 

1082 response = await self._arequest_with_retries("GET", f"{base_url}/{index}") 

1083 ls_utils.raise_for_status_with_text(response) 

1084 return ls_schemas.RunWithAnnotationQueueInfo(**response.json()) 

1085 

1086 @ls_beta.warn_beta 

1087 async def index_dataset( 

1088 self, 

1089 *, 

1090 dataset_id: ls_client.ID_TYPE, 

1091 tag: str = "latest", 

1092 **kwargs: Any, 

1093 ) -> None: 

1094 """Enable dataset indexing. Examples are indexed by their inputs. 

1095 

1096 This enables searching for similar examples by inputs with 

1097 ``client.similar_examples()``. 

1098 

1099 Args: 

1100 dataset_id (UUID): The ID of the dataset to index. 

1101 tag (str, optional): The version of the dataset to index. If 'latest' 

1102 then any updates to the dataset (additions, updates, deletions of 

1103 examples) will be reflected in the index. 

1104 

1105 Returns: 

1106 None 

1107 

1108 Raises: 

1109 requests.HTTPError: If the request fails. 

1110 """ # noqa: E501 

1111 dataset_id = ls_client._as_uuid(dataset_id, "dataset_id") 

1112 resp = await self._arequest_with_retries( 

1113 "POST", 

1114 f"/datasets/{dataset_id}/index", 

1115 content=ls_client._dumps_json({"tag": tag, **kwargs}), 

1116 ) 

1117 ls_utils.raise_for_status_with_text(resp) 

1118 

1119 @ls_beta.warn_beta 

1120 async def sync_indexed_dataset( 

1121 self, 

1122 *, 

1123 dataset_id: ls_client.ID_TYPE, 

1124 **kwargs: Any, 

1125 ) -> None: 

1126 """Sync dataset index. 

1127 

1128 This already happens automatically every 5 minutes, but you can call this to 

1129 force a sync. 

1130 

1131 Args: 

1132 dataset_id (UUID): The ID of the dataset to sync. 

1133 

1134 Returns: 

1135 None 

1136 

1137 Raises: 

1138 requests.HTTPError: If the request fails. 

1139 """ # noqa: E501 

1140 dataset_id = ls_client._as_uuid(dataset_id, "dataset_id") 

1141 resp = await self._arequest_with_retries( 

1142 "POST", 

1143 f"/datasets/{dataset_id}/index/sync", 

1144 content=ls_client._dumps_json({**kwargs}), 

1145 ) 

1146 ls_utils.raise_for_status_with_text(resp) 

1147 

1148 @ls_beta.warn_beta 

1149 async def similar_examples( 

1150 self, 

1151 inputs: dict, 

1152 /, 

1153 *, 

1154 limit: int, 

1155 dataset_id: ls_client.ID_TYPE, 

1156 filter: Optional[str] = None, 

1157 **kwargs: Any, 

1158 ) -> list[ls_schemas.ExampleSearch]: 

1159 r"""Retrieve the dataset examples whose inputs best match the current inputs. 

1160 

1161 !!! note 

1162 

1163 Must have few-shot indexing enabled for the dataset. See `client.index_dataset()`. 

1164 

1165 Args: 

1166 inputs (dict): The inputs to use as a search query. Must match the dataset 

1167 input schema. Must be JSON serializable. 

1168 limit (int): The maximum number of examples to return. 

1169 dataset_id (str or UUID): The ID of the dataset to search over. 

1170 filter (str, optional): A filter string to apply to the search results. Uses 

1171 the same syntax as the `filter` parameter in `list_runs()`. Only a subset 

1172 of operations are supported. Defaults to None. 

1173 kwargs (Any): Additional keyword args to pass as part of request body. 

1174 

1175 Returns: 

1176 List of ExampleSearch objects. 

1177 

1178 Examples: 

1179 ```python 

1180 from langsmith import Client 

1181 

1182 client = Client() 

1183 await client.similar_examples( 

1184 {"question": "When would i use the runnable generator"}, 

1185 limit=3, 

1186 dataset_id="...", 

1187 ) 

1188 ``` 

1189 

1190 ```python 

1191 [ 

1192 ExampleSearch( 

1193 inputs={ 

1194 "question": "How do I cache a Chat model? What caches can I use?" 

1195 }, 

1196 outputs={ 

1197 "answer": "You can use LangChain's caching layer for Chat Models. This can save you money by reducing the number of API calls you make to the LLM provider, if you're often requesting the same completion multiple times, and speed up your application.\n\n```python\n\nfrom langchain.cache import InMemoryCache\nlangchain.llm_cache = InMemoryCache()\n\n# The first time, it is not yet in cache, so it should take longer\nllm.predict('Tell me a joke')\n\n```\n\nYou can also use SQLite Cache which uses a SQLite database:\n\n```python\n rm .langchain.db\n\nfrom langchain.cache import SQLiteCache\nlangchain.llm_cache = SQLiteCache(database_path=\".langchain.db\")\n\n# The first time, it is not yet in cache, so it should take longer\nllm.predict('Tell me a joke') \n```\n" 

1198 }, 

1199 metadata=None, 

1200 id=UUID("b2ddd1c4-dff6-49ae-8544-f48e39053398"), 

1201 dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"), 

1202 ), 

1203 ExampleSearch( 

1204 inputs={"question": "What's a runnable lambda?"}, 

1205 outputs={ 

1206 "answer": "A runnable lambda is an object that implements LangChain's `Runnable` interface and runs a callbale (i.e., a function). Note the function must accept a single argument." 

1207 }, 

1208 metadata=None, 

1209 id=UUID("f94104a7-2434-4ba7-8293-6a283f4860b4"), 

1210 dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"), 

1211 ), 

1212 ExampleSearch( 

1213 inputs={"question": "Show me how to use RecursiveURLLoader"}, 

1214 outputs={ 

1215 "answer": 'The RecursiveURLLoader comes from the langchain.document_loaders.recursive_url_loader module. Here\'s an example of how to use it:\n\n```python\nfrom langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader\n\n# Create an instance of RecursiveUrlLoader with the URL you want to load\nloader = RecursiveUrlLoader(url="https://example.com")\n\n# Load all child links from the URL page\nchild_links = loader.load()\n\n# Print the child links\nfor link in child_links:\n print(link)\n```\n\nMake sure to replace "https://example.com" with the actual URL you want to load. The load() method returns a list of child links found on the URL page. You can iterate over this list to access each child link.' 

1216 }, 

1217 metadata=None, 

1218 id=UUID("0308ea70-a803-4181-a37d-39e95f138f8c"), 

1219 dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"), 

1220 ), 

1221 ] 

1222 ``` 

1223 

1224 """ # noqa: E501 

1225 dataset_id = ls_client._as_uuid(dataset_id, "dataset_id") 

1226 req = { 

1227 "inputs": inputs, 

1228 "limit": limit, 

1229 **kwargs, 

1230 } 

1231 if filter: 

1232 req["filter"] = filter 

1233 

1234 resp = await self._arequest_with_retries( 

1235 "POST", 

1236 f"/datasets/{dataset_id}/search", 

1237 content=ls_client._dumps_json(req), 

1238 ) 

1239 ls_utils.raise_for_status_with_text(resp) 

1240 examples = [] 

1241 for ex in resp.json()["examples"]: 

1242 examples.append(ls_schemas.ExampleSearch(**ex, dataset_id=dataset_id)) 

1243 return examples 

1244 

1245 async def _get_settings(self) -> ls_schemas.LangSmithSettings: 

1246 """Get the settings for the current tenant. 

1247 

1248 Returns: 

1249 dict: The settings for the current tenant. 

1250 """ 

1251 if self._settings is None: 

1252 response = await self._arequest_with_retries("GET", "/settings") 

1253 ls_utils.raise_for_status_with_text(response) 

1254 self._settings = ls_schemas.LangSmithSettings(**response.json()) 

1255 

1256 return self._settings 

1257 

1258 async def _current_tenant_is_owner(self, owner: str) -> bool: 

1259 """Check if the current workspace has the same handle as owner. 

1260 

1261 Args: 

1262 owner (str): The owner to check against. 

1263 

1264 Returns: 

1265 bool: True if the current tenant is the owner, False otherwise. 

1266 """ 

1267 settings = await self._get_settings() 

1268 return owner == "-" or settings.tenant_handle == owner 

1269 

1270 async def _owner_conflict_error( 

1271 self, action: str, owner: str 

1272 ) -> ls_utils.LangSmithUserError: 

1273 settings = await self._get_settings() 

1274 return ls_utils.LangSmithUserError( 

1275 f"Cannot {action} for another tenant.\n" 

1276 f"Current tenant: {settings.tenant_handle},\n" 

1277 f"Requested tenant: {owner}" 

1278 ) 

1279 

1280 async def _get_latest_commit_hash( 

1281 self, prompt_owner_and_name: str, limit: int = 1, offset: int = 0 

1282 ) -> Optional[str]: 

1283 """Get the latest commit hash for a prompt. 

1284 

1285 Args: 

1286 prompt_owner_and_name (str): The owner and name of the prompt. 

1287 limit (int, default=1): The maximum number of commits to fetch. Defaults to 1. 

1288 offset (int, default=0): The number of commits to skip. Defaults to 0. 

1289 

1290 Returns: 

1291 Optional[str]: The latest commit hash, or None if no commits are found. 

1292 """ 

1293 response = await self._arequest_with_retries( 

1294 "GET", 

1295 f"/commits/{prompt_owner_and_name}/", 

1296 params={"limit": limit, "offset": offset}, 

1297 ) 

1298 commits = response.json()["commits"] 

1299 return commits[0]["commit_hash"] if commits else None 

1300 

1301 async def _like_or_unlike_prompt( 

1302 self, prompt_identifier: str, like: bool 

1303 ) -> dict[str, int]: 

1304 """Like or unlike a prompt. 

1305 

1306 Args: 

1307 prompt_identifier (str): The identifier of the prompt. 

1308 like (bool): True to like the prompt, False to unlike it. 

1309 

1310 Returns: 

1311 A dictionary with the key 'likes' and the count of likes as the value. 

1312 

1313 Raises: 

1314 requests.exceptions.HTTPError: If the prompt is not found or 

1315 another error occurs. 

1316 """ 

1317 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier) 

1318 response = await self._arequest_with_retries( 

1319 "POST", f"/likes/{owner}/{prompt_name}", json={"like": like} 

1320 ) 

1321 response.raise_for_status() 

1322 return response.json() 

1323 

1324 async def _get_prompt_url(self, prompt_identifier: str) -> str: 

1325 """Get a URL for a prompt. 

1326 

1327 Args: 

1328 prompt_identifier (str): The identifier of the prompt. 

1329 

1330 Returns: 

1331 str: The URL for the prompt. 

1332 

1333 """ 

1334 owner, prompt_name, commit_hash = ls_utils.parse_prompt_identifier( 

1335 prompt_identifier 

1336 ) 

1337 

1338 if not self._current_tenant_is_owner(owner): 

1339 return f"{self._host_url}/hub/{owner}/{prompt_name}:{commit_hash[:8]}" 

1340 

1341 settings = await self._get_settings() 

1342 return ( 

1343 f"{self._host_url}/prompts/{prompt_name}/{commit_hash[:8]}" 

1344 f"?organizationId={settings.id}" 

1345 ) 

1346 

1347 async def _prompt_exists(self, prompt_identifier: str) -> bool: 

1348 """Check if a prompt exists. 

1349 

1350 Args: 

1351 prompt_identifier (str): The identifier of the prompt. 

1352 

1353 Returns: 

1354 bool: True if the prompt exists, False otherwise. 

1355 """ 

1356 prompt = await self.get_prompt(prompt_identifier) 

1357 return True if prompt else False 

1358 

1359 async def like_prompt(self, prompt_identifier: str) -> dict[str, int]: 

1360 """Like a prompt. 

1361 

1362 Args: 

1363 prompt_identifier (str): The identifier of the prompt. 

1364 

1365 Returns: 

1366 Dict[str, int]: A dictionary with the key 'likes' and the count of likes as the value. 

1367 

1368 """ 

1369 return await self._like_or_unlike_prompt(prompt_identifier, like=True) 

1370 

1371 async def unlike_prompt(self, prompt_identifier: str) -> dict[str, int]: 

1372 """Unlike a prompt. 

1373 

1374 Args: 

1375 prompt_identifier (str): The identifier of the prompt. 

1376 

1377 Returns: 

1378 Dict[str, int]: A dictionary with the key 'likes' and the count of likes as the value. 

1379 

1380 """ 

1381 return await self._like_or_unlike_prompt(prompt_identifier, like=False) 

1382 

1383 async def list_prompts( 

1384 self, 

1385 *, 

1386 limit: int = 100, 

1387 offset: int = 0, 

1388 is_public: Optional[bool] = None, 

1389 is_archived: Optional[bool] = False, 

1390 sort_field: ls_schemas.PromptSortField = ls_schemas.PromptSortField.updated_at, 

1391 sort_direction: Literal["desc", "asc"] = "desc", 

1392 query: Optional[str] = None, 

1393 ) -> ls_schemas.ListPromptsResponse: 

1394 """List prompts with pagination. 

1395 

1396 Args: 

1397 limit (int, default=100): The maximum number of prompts to return. Defaults to 100. 

1398 offset (int, default=0): The number of prompts to skip. Defaults to 0. 

1399 is_public (Optional[bool]): Filter prompts by if they are public. 

1400 is_archived (Optional[bool]): Filter prompts by if they are archived. 

1401 sort_field (PromptSortField): The field to sort by. 

1402 Defaults to "updated_at". 

1403 sort_direction (Literal["desc", "asc"], default="desc"): The order to sort by. 

1404 Defaults to "desc". 

1405 query (Optional[str]): Filter prompts by a search query. 

1406 

1407 Returns: 

1408 ListPromptsResponse: A response object containing 

1409 the list of prompts. 

1410 """ 

1411 params = { 

1412 "limit": limit, 

1413 "offset": offset, 

1414 "is_public": ( 

1415 "true" if is_public else "false" if is_public is not None else None 

1416 ), 

1417 "is_archived": "true" if is_archived else "false", 

1418 "sort_field": ( 

1419 sort_field.value 

1420 if isinstance(sort_field, ls_schemas.PromptSortField) 

1421 else sort_field 

1422 ), 

1423 "sort_direction": sort_direction, 

1424 "query": query, 

1425 "match_prefix": "true" if query else None, 

1426 } 

1427 

1428 response = await self._arequest_with_retries( 

1429 "GET", "/repos/", params=_exclude_none(params) 

1430 ) 

1431 return ls_schemas.ListPromptsResponse(**response.json()) 

1432 

1433 async def get_prompt(self, prompt_identifier: str) -> Optional[ls_schemas.Prompt]: 

1434 """Get a specific prompt by its identifier. 

1435 

1436 Args: 

1437 prompt_identifier (str): The identifier of the prompt. 

1438 The identifier should be in the format "prompt_name" or "owner/prompt_name". 

1439 

1440 Returns: 

1441 Optional[Prompt]: The prompt object. 

1442 

1443 Raises: 

1444 requests.exceptions.HTTPError: If the prompt is not found or 

1445 another error occurs. 

1446 """ 

1447 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier) 

1448 try: 

1449 response = await self._arequest_with_retries( 

1450 "GET", f"/repos/{owner}/{prompt_name}" 

1451 ) 

1452 return ls_schemas.Prompt(**response.json()["repo"]) 

1453 except ls_utils.LangSmithNotFoundError: 

1454 return None 

1455 

1456 async def create_prompt( 

1457 self, 

1458 prompt_identifier: str, 

1459 *, 

1460 description: Optional[str] = None, 

1461 readme: Optional[str] = None, 

1462 tags: Optional[Sequence[str]] = None, 

1463 is_public: bool = False, 

1464 ) -> ls_schemas.Prompt: 

1465 """Create a new prompt. 

1466 

1467 Does not attach prompt object, just creates an empty prompt. 

1468 

1469 Args: 

1470 prompt_identifier (str): The identifier of the prompt. 

1471 The identifier should be in the formatof owner/name:hash, name:hash, owner/name, or name 

1472 description (Optional[str]): A description of the prompt. 

1473 readme (Optional[str]): A readme for the prompt. 

1474 tags (Optional[Sequence[str]]): A list of tags for the prompt. 

1475 is_public (bool): Whether the prompt should be public. Defaults to False. 

1476 

1477 Returns: 

1478 Prompt: The created prompt object. 

1479 

1480 Raises: 

1481 ValueError: If the current tenant is not the owner. 

1482 HTTPError: If the server request fails. 

1483 """ 

1484 settings = await self._get_settings() 

1485 if is_public and not settings.tenant_handle: 

1486 raise ls_utils.LangSmithUserError( 

1487 "Cannot create a public prompt without first\n" 

1488 "creating a LangChain Hub handle. " 

1489 "You can add a handle by creating a public prompt at:\n" 

1490 "https://smith.langchain.com/prompts" 

1491 ) 

1492 

1493 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier) 

1494 if not (await self._current_tenant_is_owner(owner=owner)): 

1495 raise (await self._owner_conflict_error("create a prompt", owner)) 

1496 

1497 json: dict[str, Union[str, bool, Sequence[str]]] = { 

1498 "repo_handle": prompt_name, 

1499 "description": description or "", 

1500 "readme": readme or "", 

1501 "tags": tags or [], 

1502 "is_public": is_public, 

1503 } 

1504 

1505 response = await self._arequest_with_retries("POST", "/repos/", json=json) 

1506 response.raise_for_status() 

1507 return ls_schemas.Prompt(**response.json()["repo"]) 

1508 

1509 async def create_commit( 

1510 self, 

1511 prompt_identifier: str, 

1512 object: Any, 

1513 *, 

1514 parent_commit_hash: Optional[str] = None, 

1515 ) -> str: 

1516 """Create a commit for an existing prompt. 

1517 

1518 Args: 

1519 prompt_identifier (str): The identifier of the prompt. 

1520 object (Any): The LangChain object to commit. 

1521 parent_commit_hash (Optional[str]): The hash of the parent commit. 

1522 Defaults to latest commit. 

1523 

1524 Returns: 

1525 str: The url of the prompt commit. 

1526 

1527 Raises: 

1528 HTTPError: If the server request fails. 

1529 ValueError: If the prompt does not exist. 

1530 """ 

1531 if not (await self._prompt_exists(prompt_identifier)): 

1532 raise ls_utils.LangSmithNotFoundError( 

1533 "Prompt does not exist, you must create it first." 

1534 ) 

1535 

1536 try: 

1537 from langchain_core.load import dumps 

1538 except ImportError: 

1539 raise ImportError( 

1540 "The client.create_commit function requires the langchain-core" 

1541 "package to run.\nInstall with `pip install langchain-core`" 

1542 ) 

1543 

1544 chain_to_push = ls_client.prep_obj_for_push(object) 

1545 json_object = dumps(chain_to_push) 

1546 manifest_dict = json.loads(json_object) 

1547 

1548 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier) 

1549 prompt_owner_and_name = f"{owner}/{prompt_name}" 

1550 

1551 if parent_commit_hash == "latest" or parent_commit_hash is None: 

1552 parent_commit_hash = await self._get_latest_commit_hash( 

1553 prompt_owner_and_name 

1554 ) 

1555 

1556 request_dict = {"parent_commit": parent_commit_hash, "manifest": manifest_dict} 

1557 response = await self._arequest_with_retries( 

1558 "POST", f"/commits/{prompt_owner_and_name}", json=request_dict 

1559 ) 

1560 

1561 commit_hash = response.json()["commit"]["commit_hash"] 

1562 

1563 return await self._get_prompt_url(f"{prompt_owner_and_name}:{commit_hash}") 

1564 

1565 async def update_prompt( 

1566 self, 

1567 prompt_identifier: str, 

1568 *, 

1569 description: Optional[str] = None, 

1570 readme: Optional[str] = None, 

1571 tags: Optional[Sequence[str]] = None, 

1572 is_public: Optional[bool] = None, 

1573 is_archived: Optional[bool] = None, 

1574 ) -> dict[str, Any]: 

1575 """Update a prompt's metadata. 

1576 

1577 To update the content of a prompt, use push_prompt or create_commit instead. 

1578 

1579 Args: 

1580 prompt_identifier (str): The identifier of the prompt to update. 

1581 description (Optional[str]): New description for the prompt. 

1582 readme (Optional[str]): New readme for the prompt. 

1583 tags (Optional[Sequence[str]]): New list of tags for the prompt. 

1584 is_public (Optional[bool]): New public status for the prompt. 

1585 is_archived (Optional[bool]): New archived status for the prompt. 

1586 

1587 Returns: 

1588 Dict[str, Any]: The updated prompt data as returned by the server. 

1589 

1590 Raises: 

1591 ValueError: If the prompt_identifier is empty. 

1592 HTTPError: If the server request fails. 

1593 """ 

1594 settings = await self._get_settings() 

1595 if is_public and not settings.tenant_handle: 

1596 raise ValueError( 

1597 "Cannot create a public prompt without first\n" 

1598 "creating a LangChain Hub handle. " 

1599 "You can add a handle by creating a public prompt at:\n" 

1600 "https://smith.langchain.com/prompts" 

1601 ) 

1602 

1603 json: dict[str, Union[str, bool, Sequence[str]]] = {} 

1604 

1605 if description is not None: 

1606 json["description"] = description 

1607 if readme is not None: 

1608 json["readme"] = readme 

1609 if is_public is not None: 

1610 json["is_public"] = is_public 

1611 if is_archived is not None: 

1612 json["is_archived"] = is_archived 

1613 if tags is not None: 

1614 json["tags"] = tags 

1615 

1616 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier) 

1617 response = await self._arequest_with_retries( 

1618 "PATCH", f"/repos/{owner}/{prompt_name}", json=json 

1619 ) 

1620 response.raise_for_status() 

1621 return response.json() 

1622 

1623 async def delete_prompt(self, prompt_identifier: str) -> None: 

1624 """Delete a prompt. 

1625 

1626 Args: 

1627 prompt_identifier (str): The identifier of the prompt to delete. 

1628 

1629 Returns: 

1630 bool: True if the prompt was successfully deleted, False otherwise. 

1631 

1632 Raises: 

1633 ValueError: If the current tenant is not the owner of the prompt. 

1634 """ 

1635 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier) 

1636 if not (await self._current_tenant_is_owner(owner)): 

1637 raise (await self._owner_conflict_error("delete a prompt", owner)) 

1638 

1639 response = await self._arequest_with_retries( 

1640 "DELETE", f"/repos/{owner}/{prompt_name}" 

1641 ) 

1642 response.raise_for_status() 

1643 

1644 async def pull_prompt_commit( 

1645 self, 

1646 prompt_identifier: str, 

1647 *, 

1648 include_model: Optional[bool] = False, 

1649 ) -> ls_schemas.PromptCommit: 

1650 """Pull a prompt object from the LangSmith API. 

1651 

1652 Args: 

1653 prompt_identifier (str): The identifier of the prompt. 

1654 

1655 Returns: 

1656 PromptCommit: The prompt object. 

1657 

1658 Raises: 

1659 ValueError: If no commits are found for the prompt. 

1660 """ 

1661 owner, prompt_name, commit_hash = ls_utils.parse_prompt_identifier( 

1662 prompt_identifier 

1663 ) 

1664 response = await self._arequest_with_retries( 

1665 "GET", 

1666 ( 

1667 f"/commits/{owner}/{prompt_name}/{commit_hash}" 

1668 f"{'?include_model=true' if include_model else ''}" 

1669 ), 

1670 ) 

1671 return ls_schemas.PromptCommit( 

1672 **{"owner": owner, "repo": prompt_name, **response.json()} 

1673 ) 

1674 

1675 async def list_prompt_commits( 

1676 self, 

1677 prompt_identifier: str, 

1678 *, 

1679 limit: Optional[int] = None, 

1680 offset: int = 0, 

1681 include_model: bool = False, 

1682 ) -> AsyncGenerator[ls_schemas.ListedPromptCommit, None]: 

1683 """List commits for a given prompt. 

1684 

1685 Args: 

1686 prompt_identifier (str): The identifier of the prompt in the format 'owner/repo_name'. 

1687 limit (Optional[int]): The maximum number of commits to return. If None, returns all commits. Defaults to None. 

1688 offset (int, default=0): The number of commits to skip before starting to return results. Defaults to 0. 

1689 include_model (bool, default=False): Whether to include the model information in the commit data. Defaults to False. 

1690 

1691 Yields: 

1692 A ListedPromptCommit object for each commit. 

1693 

1694 !!! note 

1695 

1696 This method uses pagination to retrieve commits. It will make multiple API calls if necessary to retrieve all commits 

1697 or up to the specified limit. 

1698 """ 

1699 owner, prompt_name, _ = ls_utils.parse_prompt_identifier(prompt_identifier) 

1700 

1701 params = { 

1702 "limit": min(100, limit) if limit is not None else limit, 

1703 "offset": offset, 

1704 "include_model": include_model, 

1705 } 

1706 i = 0 

1707 while True: 

1708 params["offset"] = offset 

1709 response = await self._arequest_with_retries( 

1710 "GET", 

1711 f"/commits/{owner}/{prompt_name}/", 

1712 params=params, 

1713 ) 

1714 val = response.json() 

1715 items = val["commits"] 

1716 total = val["total"] 

1717 

1718 if not items: 

1719 break 

1720 for it in items: 

1721 if limit is not None and i >= limit: 

1722 return # Stop iteration if we've reached the limit 

1723 yield ls_schemas.ListedPromptCommit( 

1724 **{"owner": owner, "repo": prompt_name, **it} 

1725 ) 

1726 i += 1 

1727 

1728 offset += len(items) 

1729 if offset >= total: 

1730 break 

1731 

1732 async def pull_prompt( 

1733 self, prompt_identifier: str, *, include_model: Optional[bool] = False 

1734 ) -> Any: 

1735 """Pull a prompt and return it as a LangChain `PromptTemplate`. 

1736 

1737 This method requires [`langchain-core`](https://pypi.org/project/langchain-core). 

1738 

1739 Args: 

1740 prompt_identifier: The identifier of the prompt. 

1741 include_model: Whether to include the model information in the prompt data. 

1742 

1743 Returns: 

1744 Any: The prompt object in the specified format. 

1745 """ 

1746 try: 

1747 from langchain_core.language_models.base import BaseLanguageModel 

1748 from langchain_core.load.load import loads 

1749 from langchain_core.output_parsers import BaseOutputParser 

1750 from langchain_core.prompts import BasePromptTemplate 

1751 from langchain_core.prompts.structured import StructuredPrompt 

1752 from langchain_core.runnables.base import RunnableBinding, RunnableSequence 

1753 except ImportError: 

1754 raise ImportError( 

1755 "The client.pull_prompt function requires the langchain-core" 

1756 "package to run.\nInstall with `pip install langchain-core`" 

1757 ) 

1758 try: 

1759 from langchain_core._api import suppress_langchain_beta_warning 

1760 except ImportError: 

1761 

1762 @contextlib.contextmanager 

1763 def suppress_langchain_beta_warning(): 

1764 yield 

1765 

1766 prompt_object = await self.pull_prompt_commit( 

1767 prompt_identifier, include_model=include_model 

1768 ) 

1769 with suppress_langchain_beta_warning(): 

1770 prompt = loads(json.dumps(prompt_object.manifest)) 

1771 

1772 if ( 

1773 isinstance(prompt, BasePromptTemplate) 

1774 or isinstance(prompt, RunnableSequence) 

1775 and isinstance(prompt.first, BasePromptTemplate) 

1776 ): 

1777 prompt_template = ( 

1778 prompt 

1779 if isinstance(prompt, BasePromptTemplate) 

1780 else ( 

1781 prompt.first 

1782 if isinstance(prompt, RunnableSequence) 

1783 and isinstance(prompt.first, BasePromptTemplate) 

1784 else None 

1785 ) 

1786 ) 

1787 if prompt_template is None: 

1788 raise ls_utils.LangSmithError( 

1789 "Prompt object is not a valid prompt template." 

1790 ) 

1791 

1792 if prompt_template.metadata is None: 

1793 prompt_template.metadata = {} 

1794 prompt_template.metadata.update( 

1795 { 

1796 "lc_hub_owner": prompt_object.owner, 

1797 "lc_hub_repo": prompt_object.repo, 

1798 "lc_hub_commit_hash": prompt_object.commit_hash, 

1799 } 

1800 ) 

1801 

1802 # Transform 2-step RunnableSequence to 3-step for structured prompts 

1803 # See create_commit for the reverse transformation when pushing a prompt 

1804 if ( 

1805 include_model 

1806 and isinstance(prompt, RunnableSequence) 

1807 and isinstance(prompt.first, StructuredPrompt) 

1808 # Make forward-compatible in case we let update the response type 

1809 and ( 

1810 len(prompt.steps) == 2 and not isinstance(prompt.last, BaseOutputParser) 

1811 ) 

1812 ): 

1813 if isinstance(prompt.last, RunnableBinding) and isinstance( 

1814 prompt.last.bound, BaseLanguageModel 

1815 ): 

1816 seq = cast(RunnableSequence, prompt.first | prompt.last.bound) 

1817 if len(seq.steps) == 3: # prompt | bound llm | output parser 

1818 rebound_llm = seq.steps[1] 

1819 prompt = RunnableSequence( 

1820 prompt.first, 

1821 rebound_llm.bind(**{**prompt.last.kwargs}), 

1822 seq.last, 

1823 ) 

1824 else: 

1825 prompt = seq # Not sure 

1826 

1827 elif isinstance(prompt.last, BaseLanguageModel): 

1828 prompt: RunnableSequence = prompt.first | prompt.last # type: ignore[no-redef, assignment] 

1829 else: 

1830 pass 

1831 

1832 return prompt 

1833 

1834 async def push_prompt( 

1835 self, 

1836 prompt_identifier: str, 

1837 *, 

1838 object: Optional[Any] = None, 

1839 parent_commit_hash: str = "latest", 

1840 is_public: Optional[bool] = None, 

1841 description: Optional[str] = None, 

1842 readme: Optional[str] = None, 

1843 tags: Optional[Sequence[str]] = None, 

1844 ) -> str: 

1845 """Push a prompt to the LangSmith API. 

1846 

1847 Can be used to update prompt metadata or prompt content. 

1848 

1849 If the prompt does not exist, it will be created. 

1850 If the prompt exists, it will be updated. 

1851 

1852 Args: 

1853 prompt_identifier (str): The identifier of the prompt. 

1854 object (Optional[Any]): The LangChain object to push. 

1855 parent_commit_hash (str): The parent commit hash. 

1856 Defaults to "latest". 

1857 is_public (Optional[bool]): Whether the prompt should be public. 

1858 If None (default), the current visibility status is maintained for existing prompts. 

1859 For new prompts, None defaults to private. 

1860 Set to True to make public, or False to make private. 

1861 description (Optional[str]): A description of the prompt. 

1862 Defaults to an empty string. 

1863 readme (Optional[str]): A readme for the prompt. 

1864 Defaults to an empty string. 

1865 tags (Optional[Sequence[str]]): A list of tags for the prompt. 

1866 Defaults to an empty list. 

1867 

1868 Returns: 

1869 str: The URL of the prompt. 

1870 """ 

1871 # Create or update prompt metadata 

1872 if await self._prompt_exists(prompt_identifier): 

1873 if any( 

1874 param is not None for param in [is_public, description, readme, tags] 

1875 ): 

1876 await self.update_prompt( 

1877 prompt_identifier, 

1878 description=description, 

1879 readme=readme, 

1880 tags=tags, 

1881 is_public=is_public, 

1882 ) 

1883 else: 

1884 await self.create_prompt( 

1885 prompt_identifier, 

1886 is_public=is_public if is_public is not None else False, 

1887 description=description, 

1888 readme=readme, 

1889 tags=tags, 

1890 ) 

1891 

1892 if object is None: 

1893 return await self._get_prompt_url(prompt_identifier=prompt_identifier) 

1894 

1895 # Create a commit with the new manifest 

1896 url = await self.create_commit( 

1897 prompt_identifier, 

1898 object, 

1899 parent_commit_hash=parent_commit_hash, 

1900 ) 

1901 return url 

1902 

1903 

1904def _exclude_none(d: dict) -> dict: 

1905 """Exclude None values from a dictionary.""" 

1906 return {k: v for k, v in d.items() if v is not None}