Coverage for langsmith/_internal/_background_thread.py: 20%

389 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1from __future__ import annotations 

2 

3import concurrent.futures as cf 

4import copy 

5import functools 

6import io 

7import logging 

8import sys 

9import threading 

10import time 

11import weakref 

12from multiprocessing import cpu_count 

13from queue import Empty, Queue 

14from typing import TYPE_CHECKING, Any, Optional, Union, cast 

15 

16from langsmith import schemas as ls_schemas 

17from langsmith import utils as ls_utils 

18from langsmith._internal._compressed_traces import CompressedTraces 

19from langsmith._internal._constants import ( 

20 _AUTO_SCALE_DOWN_NEMPTY_TRIGGER, 

21 _AUTO_SCALE_UP_NTHREADS_LIMIT, 

22 _AUTO_SCALE_UP_QSIZE_TRIGGER, 

23 _BOUNDARY, 

24) 

25from langsmith._internal._operations import ( 

26 SerializedFeedbackOperation, 

27 SerializedRunOperation, 

28 combine_serialized_queue_operations, 

29) 

30 

31if TYPE_CHECKING: 

32 from opentelemetry.context.context import Context # type: ignore[import] 

33 

34 from langsmith.client import Client 

35 

36logger = logging.getLogger("langsmith.client") 

37 

38LANGSMITH_CLIENT_THREAD_POOL = cf.ThreadPoolExecutor(max_workers=cpu_count()) 

39 

40 

41def _group_batch_by_api_endpoint( 

42 batch: list[TracingQueueItem], 

43) -> dict[tuple[Optional[str], Optional[str]], list[TracingQueueItem]]: 

44 """Group batch items by (api_url, api_key) combination.""" 

45 from collections import defaultdict 

46 

47 grouped = defaultdict(list) 

48 for item in batch: 

49 key = (item.api_url, item.api_key) 

50 grouped[key].append(item) 

51 return grouped 

52 

53 

54@functools.total_ordering 

55class TracingQueueItem: 

56 """An item in the tracing queue. 

57 

58 Attributes: 

59 priority (str): The priority of the item. 

60 item (Any): The item itself. 

61 otel_context (Optional[Context]): The OTEL context of the item. 

62 """ 

63 

64 priority: str 

65 item: Union[SerializedRunOperation, SerializedFeedbackOperation] 

66 api_url: Optional[str] 

67 api_key: Optional[str] 

68 otel_context: Optional[Context] 

69 

70 __slots__ = ("priority", "item", "api_key", "api_url", "otel_context") 

71 

72 def __init__( 

73 self, 

74 priority: str, 

75 item: Union[SerializedRunOperation, SerializedFeedbackOperation], 

76 api_key: Optional[str] = None, 

77 api_url: Optional[str] = None, 

78 otel_context: Optional[Context] = None, 

79 ) -> None: 

80 self.priority = priority 

81 self.item = item 

82 self.api_key = api_key 

83 self.api_url = api_url 

84 self.otel_context = otel_context 

85 

86 def __lt__(self, other: TracingQueueItem) -> bool: 

87 return (self.priority, self.item.__class__) < ( 

88 other.priority, 

89 other.item.__class__, 

90 ) 

91 

92 def __eq__(self, other: object) -> bool: 

93 return isinstance(other, TracingQueueItem) and ( 

94 self.priority, 

95 self.item.__class__, 

96 ) == (other.priority, other.item.__class__) 

97 

98 

99def _tracing_thread_drain_queue( 

100 tracing_queue: Queue, limit: int = 100, block: bool = True, max_size_bytes: int = 0 

101) -> list[TracingQueueItem]: 

102 next_batch: list[TracingQueueItem] = [] 

103 current_size = 0 

104 

105 try: 

106 # wait 250ms for the first item, then 

107 # - drain the queue with a 50ms block timeout 

108 # - stop draining if we hit either count or size limit 

109 # shorter drain timeout is used instead of non-blocking calls to 

110 # avoid creating too many small batches 

111 if item := tracing_queue.get(block=block, timeout=0.25): 

112 next_batch.append(item) 

113 if max_size_bytes > 0: 

114 current_size += item.item.calculate_serialized_size() 

115 # If first item already exceeds limit, return just this item 

116 if current_size > max_size_bytes: 

117 return next_batch 

118 

119 # Continue draining until we hit count limit OR size limit 

120 while True: 

121 try: 

122 item = tracing_queue.get(block=block, timeout=0.05) 

123 except Empty: 

124 break 

125 

126 # Add the item first 

127 next_batch.append(item) 

128 

129 # Then check size limit AFTER adding the item 

130 if max_size_bytes > 0: 

131 current_size += item.item.calculate_serialized_size() 

132 # If we've exceeded size limit, stop here 

133 # (item is included in this batch) 

134 if current_size > max_size_bytes: 

135 break 

136 

137 # Check count limit AFTER adding the item 

138 if limit and len(next_batch) >= limit: 

139 break 

140 except Empty: 

141 pass 

142 return next_batch 

143 

144 

145def _tracing_thread_drain_compressed_buffer( 

146 client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520 

147) -> tuple[Optional[io.BytesIO], Optional[tuple[int, int]]]: 

148 try: 

149 if client.compressed_traces is None: 

150 return None, None 

151 with client.compressed_traces.lock: 

152 pre_compressed_size = client.compressed_traces.uncompressed_size 

153 

154 size_limit_bytes = client._max_batch_size_bytes or size_limit_bytes 

155 

156 if size_limit is not None and size_limit <= 0: 

157 raise ValueError(f"size_limit must be positive; got {size_limit}") 

158 if size_limit_bytes is not None and size_limit_bytes < 0: 

159 raise ValueError( 

160 f"size_limit_bytes must be nonnegative; got {size_limit_bytes}" 

161 ) 

162 

163 if ( 

164 size_limit_bytes is None or pre_compressed_size < size_limit_bytes 

165 ) and ( 

166 size_limit is None or client.compressed_traces.trace_count < size_limit 

167 ): 

168 return None, None 

169 

170 # Write final boundary and close compression stream 

171 client.compressed_traces.compressor_writer.write( 

172 f"--{_BOUNDARY}--\r\n".encode() 

173 ) 

174 client.compressed_traces.compressor_writer.close() 

175 current_size = client.compressed_traces.buffer.tell() 

176 

177 filled_buffer = client.compressed_traces.buffer 

178 setattr( 

179 cast(Any, filled_buffer), 

180 "context", 

181 client.compressed_traces._context, 

182 ) 

183 

184 compressed_traces_info = (pre_compressed_size, current_size) 

185 

186 client.compressed_traces.reset() 

187 

188 filled_buffer.seek(0) 

189 return (filled_buffer, compressed_traces_info) 

190 except Exception: 

191 logger.error( 

192 "LangSmith tracing error: Failed to submit trace data.\n" 

193 "This does not affect your application's runtime.\n" 

194 "Error details:", 

195 exc_info=True, 

196 ) 

197 # exceptions are logged elsewhere, but we need to make sure the 

198 # background thread continues to run 

199 return None, None 

200 

201 

202def _process_buffered_run_ops_batch( 

203 client: Client, batch_to_process: list[tuple[str, dict]] 

204) -> None: 

205 """Process a batch of run operations asynchronously.""" 

206 try: 

207 # Extract just the run dictionaries for process_buffered_run_ops 

208 run_dicts = [run_data for _, run_data in batch_to_process] 

209 original_ids = [run.get("id") for run in run_dicts] 

210 

211 # Apply process_buffered_run_ops transformation 

212 if client._process_buffered_run_ops is None: 

213 raise RuntimeError( 

214 "process_buffered_run_ops should not be None when processing batch" 

215 ) 

216 processed_runs = list(client._process_buffered_run_ops(run_dicts)) 

217 

218 # Validate that the transformation preserves run count and IDs 

219 if len(processed_runs) != len(run_dicts): 

220 raise ValueError( 

221 f"process_buffered_run_ops must return the same number of runs. " 

222 f"Expected {len(run_dicts)}, got {len(processed_runs)}" 

223 ) 

224 

225 processed_ids = [run.get("id") for run in processed_runs] 

226 if processed_ids != original_ids: 

227 raise ValueError( 

228 f"process_buffered_run_ops must preserve run IDs in the same order. " 

229 f"Expected {original_ids}, got {processed_ids}" 

230 ) 

231 

232 # Process each run and add to compressed traces 

233 for (operation, _), processed_run in zip(batch_to_process, processed_runs): 

234 if operation == "post": 

235 client._create_run(processed_run) 

236 elif operation == "patch": 

237 client._update_run(processed_run) 

238 

239 # Trigger data available event 

240 if client._data_available_event: 

241 client._data_available_event.set() 

242 except Exception: 

243 # Log errors but don't crash the background thread 

244 logger.error( 

245 "LangSmith buffered run ops processing error: Failed to process batch.\n" 

246 "This does not affect your application's runtime.\n" 

247 "Error details:", 

248 exc_info=True, 

249 ) 

250 

251 

252def _tracing_thread_handle_batch( 

253 client: Client, 

254 tracing_queue: Queue, 

255 batch: list[TracingQueueItem], 

256 use_multipart: bool, 

257 mark_task_done: bool = True, 

258 ops: Optional[ 

259 list[Union[SerializedRunOperation, SerializedFeedbackOperation]] 

260 ] = None, 

261) -> None: 

262 """Handle a batch of tracing queue items by sending them to LangSmith. 

263 

264 Args: 

265 client: The LangSmith client to use for sending data. 

266 tracing_queue: The queue containing tracing items (used for task_done calls). 

267 batch: List of tracing queue items to process. 

268 use_multipart: Whether to use multipart endpoint for sending data. 

269 mark_task_done: Whether to mark queue tasks as done after processing. 

270 Set to False when called from parallel execution to avoid double counting. 

271 ops: Pre-combined serialized operations to use instead of combining from batch. 

272 If None, operations will be combined from the batch items. 

273 """ 

274 try: 

275 # Group batch items by (api_url, api_key) combination 

276 grouped_batches = _group_batch_by_api_endpoint(batch) 

277 

278 for (api_url, api_key), group_batch in grouped_batches.items(): 

279 if not ops: 

280 group_ops = combine_serialized_queue_operations( 

281 [item.item for item in group_batch] 

282 ) 

283 else: 

284 group_ids = {item.item.id for item in group_batch} 

285 group_ops = [op for op in ops if op.id in group_ids] 

286 

287 if use_multipart: 

288 client._multipart_ingest_ops( 

289 group_ops, api_url=api_url, api_key=api_key 

290 ) 

291 else: 

292 if any(isinstance(op, SerializedFeedbackOperation) for op in group_ops): 

293 logger.warning( 

294 "Feedback operations are not supported in non-multipart mode" 

295 ) 

296 group_ops = [ 

297 op 

298 for op in group_ops 

299 if not isinstance(op, SerializedFeedbackOperation) 

300 ] 

301 client._batch_ingest_run_ops( 

302 cast(list[SerializedRunOperation], group_ops), 

303 api_url=api_url, 

304 api_key=api_key, 

305 ) 

306 

307 except Exception as e: 

308 logger.error( 

309 "LangSmith tracing error: Failed to submit trace data.\n" 

310 "This does not affect your application's runtime.\n" 

311 "Error details:", 

312 exc_info=True, 

313 ) 

314 client._invoke_tracing_error_callback(e) 

315 finally: 

316 if mark_task_done and tracing_queue is not None: 

317 for _ in batch: 

318 try: 

319 tracing_queue.task_done() 

320 except ValueError as e: 

321 if "task_done() called too many times" in str(e): 

322 # This can happen during shutdown when multiple threads 

323 # process the same queue items. It's harmless. 

324 logger.debug( 

325 f"Ignoring harmless task_done error during shutdown: {e}" 

326 ) 

327 else: 

328 raise 

329 

330 

331def _otel_tracing_thread_handle_batch( 

332 client: Client, 

333 tracing_queue: Queue, 

334 batch: list[TracingQueueItem], 

335 mark_task_done: bool = True, 

336 ops: Optional[ 

337 list[Union[SerializedRunOperation, SerializedFeedbackOperation]] 

338 ] = None, 

339) -> None: 

340 """Handle a batch of tracing queue items by exporting them to OTEL. 

341 

342 Args: 

343 client: The LangSmith client containing the OTEL exporter. 

344 tracing_queue: The queue containing tracing items (used for task_done calls). 

345 batch: List of tracing queue items to process. 

346 mark_task_done: Whether to mark queue tasks as done after processing. 

347 Set to False when called from parallel execution to avoid double counting. 

348 ops: Pre-combined serialized operations to use instead of combining from batch. 

349 If None, operations will be combined from the batch items. 

350 """ 

351 try: 

352 if ops is None: 

353 ops = combine_serialized_queue_operations([item.item for item in batch]) 

354 

355 run_ops = [op for op in ops if isinstance(op, SerializedRunOperation)] 

356 otel_context_map = { 

357 item.item.id: item.otel_context 

358 for item in batch 

359 if isinstance(item.item, SerializedRunOperation) 

360 } 

361 if run_ops: 

362 if client.otel_exporter is not None: 

363 client.otel_exporter.export_batch(run_ops, otel_context_map) 

364 else: 

365 logger.error( 

366 "LangSmith tracing error: Failed to submit OTEL trace data.\n" 

367 "This does not affect your application's runtime.\n" 

368 "Error details: client.otel_exporter is None" 

369 ) 

370 

371 except Exception as e: 

372 logger.error( 

373 "OTEL tracing error: Failed to submit trace data.\n" 

374 "This does not affect your application's runtime.\n" 

375 "Error details:", 

376 exc_info=True, 

377 ) 

378 client._invoke_tracing_error_callback(e) 

379 finally: 

380 if mark_task_done and tracing_queue is not None: 

381 for _ in batch: 

382 try: 

383 tracing_queue.task_done() 

384 except ValueError as e: 

385 if "task_done() called too many times" in str(e): 

386 # This can happen during shutdown when multiple threads 

387 # process the same queue items. It's harmless. 

388 logger.debug( 

389 f"Ignoring harmless task_done error during shutdown: {e}" 

390 ) 

391 else: 

392 raise 

393 

394 

395def _hybrid_tracing_thread_handle_batch( 

396 client: Client, 

397 tracing_queue: Queue, 

398 batch: list[TracingQueueItem], 

399 use_multipart: bool, 

400 mark_task_done: bool = True, 

401) -> None: 

402 """Handle a batch of tracing queue items by sending to both both LangSmith and OTEL. 

403 

404 Args: 

405 client: The LangSmith client to use for sending data. 

406 tracing_queue: The queue containing tracing items (used for task_done calls). 

407 batch: List of tracing queue items to process. 

408 use_multipart: Whether to use multipart endpoint for LangSmith. 

409 mark_task_done: Whether to mark queue tasks as done after processing. 

410 Set to False primarily for testing when items weren't actually queued. 

411 """ 

412 # Combine operations once to avoid race conditions 

413 ops = combine_serialized_queue_operations([item.item for item in batch]) 

414 

415 # Create copies for each thread to avoid shared mutation 

416 langsmith_ops = copy.deepcopy(ops) 

417 otel_ops = copy.deepcopy(ops) 

418 

419 try: 

420 # Use ThreadPoolExecutor for parallel execution 

421 with cf.ThreadPoolExecutor(max_workers=2) as executor: 

422 # Submit both tasks 

423 future_langsmith = executor.submit( 

424 _tracing_thread_handle_batch, 

425 client, 

426 tracing_queue, 

427 batch, 

428 use_multipart, 

429 False, # Don't mark tasks done - we'll do it once at the end 

430 langsmith_ops, 

431 ) 

432 future_otel = executor.submit( 

433 _otel_tracing_thread_handle_batch, 

434 client, 

435 tracing_queue, 

436 batch, 

437 False, # Don't mark tasks done - we'll do it once at the end 

438 otel_ops, 

439 ) 

440 

441 # Wait for both to complete 

442 future_langsmith.result() 

443 future_otel.result() 

444 except RuntimeError as e: 

445 if "cannot schedule new futures after interpreter shutdown" in str(e): 

446 # During interpreter shutdown, ThreadPoolExecutor is blocked, 

447 # fall back to sequential processing 

448 logger.debug( 

449 "Interpreter shutting down, falling back to sequential processing" 

450 ) 

451 _tracing_thread_handle_batch( 

452 client, tracing_queue, batch, use_multipart, False, langsmith_ops 

453 ) 

454 _otel_tracing_thread_handle_batch( 

455 client, tracing_queue, batch, False, otel_ops 

456 ) 

457 else: 

458 raise 

459 

460 # Mark all tasks as done once, only if requested 

461 if mark_task_done and tracing_queue is not None: 

462 for _ in batch: 

463 try: 

464 tracing_queue.task_done() 

465 except ValueError as e: 

466 if "task_done() called too many times" in str(e): 

467 # This can happen during shutdown when multiple threads 

468 # process the same queue items. It's harmless. 

469 logger.debug( 

470 f"Ignoring harmless task_done error during shutdown: {e}" 

471 ) 

472 else: 

473 raise 

474 

475 

476def _is_using_internal_otlp_provider(client: Client) -> bool: 

477 """Check if client is using LangSmith's internal OTLP provider. 

478 

479 Returns True if using LangSmith's internal provider, False if user 

480 provided their own. 

481 """ 

482 if not hasattr(client, "otel_exporter") or client.otel_exporter is None: 

483 return False 

484 

485 try: 

486 # Use OpenTelemetry's standard API to get the global TracerProvider 

487 # Check if OTEL is available 

488 if not ls_utils.is_env_var_truish("OTEL_ENABLED"): 

489 return False 

490 

491 # Get the global TracerProvider and check its resource attributes 

492 from opentelemetry import trace # type: ignore[import] 

493 

494 tracer_provider = trace.get_tracer_provider() 

495 if hasattr(tracer_provider, "resource") and hasattr( 

496 tracer_provider.resource, "attributes" 

497 ): 

498 is_internal = tracer_provider.resource.attributes.get( 

499 "langsmith.internal_provider", False 

500 ) 

501 logger.debug( 

502 f"TracerProvider resource check: " 

503 f"langsmith.internal_provider={is_internal}" 

504 ) 

505 return is_internal 

506 

507 return False 

508 except Exception as e: 

509 logger.debug( 

510 f"Could not determine TracerProvider type: {e}, assuming user-provided" 

511 ) 

512 return False 

513 

514 

515def get_size_limit_from_env() -> Optional[int]: 

516 size_limit_str = ls_utils.get_env_var( 

517 "BATCH_INGEST_SIZE_LIMIT", 

518 ) 

519 if size_limit_str is not None: 

520 try: 

521 return int(size_limit_str) 

522 except ValueError: 

523 logger.warning( 

524 f"Invalid value for BATCH_INGEST_SIZE_LIMIT: {size_limit_str}, " 

525 "continuing with default" 

526 ) 

527 return None 

528 

529 

530def _ensure_ingest_config( 

531 info: ls_schemas.LangSmithInfo, 

532) -> ls_schemas.BatchIngestConfig: 

533 default_config = ls_schemas.BatchIngestConfig( 

534 use_multipart_endpoint=False, 

535 size_limit_bytes=None, # Note this field is not used here 

536 size_limit=100, 

537 scale_up_nthreads_limit=_AUTO_SCALE_UP_NTHREADS_LIMIT, 

538 scale_up_qsize_trigger=_AUTO_SCALE_UP_QSIZE_TRIGGER, 

539 scale_down_nempty_trigger=_AUTO_SCALE_DOWN_NEMPTY_TRIGGER, 

540 ) 

541 if not info: 

542 return default_config 

543 try: 

544 if not info.batch_ingest_config: 

545 return default_config 

546 env_size_limit = get_size_limit_from_env() 

547 if env_size_limit is not None: 

548 info.batch_ingest_config["size_limit"] = env_size_limit 

549 return info.batch_ingest_config 

550 except BaseException: 

551 return default_config 

552 

553 

554def get_tracing_mode() -> tuple[bool, bool]: 

555 """Get the current tracing mode configuration. 

556 

557 Returns: 

558 tuple[bool, bool]: 

559 - hybrid_otel_and_langsmith: True if both OTEL and LangSmith tracing 

560 are enabled, which is default behavior if OTEL_ENABLED is set to 

561 true and OTEL_ONLY is not set to true 

562 - is_otel_only: True if only OTEL tracing is enabled 

563 """ 

564 otel_enabled = ls_utils.is_env_var_truish("OTEL_ENABLED") 

565 otel_only = ls_utils.is_env_var_truish("OTEL_ONLY") 

566 

567 # If OTEL is not enabled, neither mode should be active 

568 if not otel_enabled: 

569 return False, False 

570 

571 hybrid_otel_and_langsmith = not otel_only 

572 is_otel_only = otel_only 

573 

574 return hybrid_otel_and_langsmith, is_otel_only 

575 

576 

577def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: 

578 client = client_ref() 

579 if client is None: 

580 return 

581 tracing_queue = client.tracing_queue 

582 assert tracing_queue is not None 

583 batch_ingest_config = _ensure_ingest_config(client.info) 

584 size_limit: int = batch_ingest_config["size_limit"] 

585 scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] 

586 scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] 

587 use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) 

588 

589 sub_threads: list[threading.Thread] = [] 

590 # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached 

591 num_known_refs = 3 

592 

593 # Disable compression if explicitly set or if using OpenTelemetry 

594 disable_compression = ( 

595 ls_utils.is_env_var_truish("DISABLE_RUN_COMPRESSION") 

596 or client.otel_exporter is not None 

597 ) 

598 if not disable_compression and use_multipart: 

599 if not (client.info.instance_flags or {}).get( 

600 "zstd_compression_enabled", False 

601 ): 

602 logger.warning( 

603 "Run compression is not enabled. Please update to the latest " 

604 "version of LangSmith. Falling back to regular multipart ingestion." 

605 ) 

606 else: 

607 client._futures = weakref.WeakSet() 

608 client.compressed_traces = CompressedTraces() 

609 client._data_available_event = threading.Event() 

610 threading.Thread( 

611 target=tracing_control_thread_func_compress_parallel, 

612 args=(weakref.ref(client),), 

613 ).start() 

614 

615 num_known_refs += 1 

616 

617 def keep_thread_active() -> bool: 

618 # if `client.cleanup()` was called, stop thread 

619 if not client or ( 

620 hasattr(client, "_manual_cleanup") and client._manual_cleanup 

621 ): 

622 logger.debug("Client is being cleaned up, stopping tracing thread") 

623 return False 

624 if not threading.main_thread().is_alive(): 

625 # main thread is dead. should not be active 

626 logger.debug("Main thread is dead, stopping tracing thread") 

627 return False 

628 

629 if hasattr(sys, "getrefcount"): 

630 # check if client refs count indicates we're the only remaining 

631 # reference to the client 

632 should_keep_thread = sys.getrefcount(client) > num_known_refs + len( 

633 sub_threads 

634 ) 

635 if not should_keep_thread: 

636 logger.debug( 

637 "Client refs count indicates we're the only remaining reference " 

638 "to the client, stopping tracing thread", 

639 ) 

640 return should_keep_thread 

641 else: 

642 # in PyPy, there is no sys.getrefcount attribute 

643 # for now, keep thread alive 

644 return True 

645 

646 # loop until 

647 while keep_thread_active(): 

648 for thread in sub_threads: 

649 if not thread.is_alive(): 

650 sub_threads.remove(thread) 

651 if ( 

652 len(sub_threads) < scale_up_nthreads_limit 

653 and tracing_queue.qsize() > scale_up_qsize_trigger 

654 ): 

655 new_thread = threading.Thread( 

656 target=_tracing_sub_thread_func, 

657 args=(weakref.ref(client), use_multipart), 

658 ) 

659 sub_threads.append(new_thread) 

660 new_thread.start() 

661 

662 hybrid_otel_and_langsmith, is_otel_only = get_tracing_mode() 

663 max_batch_size = ( 

664 client._max_batch_size_bytes 

665 or batch_ingest_config.get("size_limit_bytes") 

666 or 0 

667 ) 

668 if next_batch := _tracing_thread_drain_queue( 

669 tracing_queue, limit=size_limit, max_size_bytes=max_batch_size 

670 ): 

671 if hybrid_otel_and_langsmith: 

672 # Hybrid mode: both OTEL and LangSmith 

673 _hybrid_tracing_thread_handle_batch( 

674 client, tracing_queue, next_batch, use_multipart 

675 ) 

676 elif is_otel_only: 

677 # OTEL-only mode 

678 _otel_tracing_thread_handle_batch(client, tracing_queue, next_batch) 

679 else: 

680 # LangSmith-only mode 

681 _tracing_thread_handle_batch( 

682 client, tracing_queue, next_batch, use_multipart 

683 ) 

684 

685 # drain the queue on exit - apply same logic 

686 hybrid_otel_and_langsmith, is_otel_only = get_tracing_mode() 

687 max_batch_size = ( 

688 client._max_batch_size_bytes or batch_ingest_config.get("size_limit_bytes") or 0 

689 ) 

690 while next_batch := _tracing_thread_drain_queue( 

691 tracing_queue, limit=size_limit, block=False, max_size_bytes=max_batch_size 

692 ): 

693 if hybrid_otel_and_langsmith: 

694 # Hybrid mode cleanup 

695 logger.debug("Hybrid mode cleanup") 

696 _hybrid_tracing_thread_handle_batch( 

697 client, tracing_queue, next_batch, use_multipart 

698 ) 

699 elif is_otel_only: 

700 # OTEL-only cleanup 

701 logger.debug("OTEL-only cleanup") 

702 _otel_tracing_thread_handle_batch(client, tracing_queue, next_batch) 

703 else: 

704 # LangSmith-only cleanup 

705 logger.debug("LangSmith-only cleanup") 

706 _tracing_thread_handle_batch( 

707 client, tracing_queue, next_batch, use_multipart 

708 ) 

709 logger.debug("Tracing control thread is shutting down") 

710 

711 

712def tracing_control_thread_func_compress_parallel( 

713 client_ref: weakref.ref[Client], flush_interval: float = 0.5 

714) -> None: 

715 client = client_ref() 

716 if client is None: 

717 return 

718 logger.debug("Tracing control thread func compress parallel called") 

719 if ( 

720 client.compressed_traces is None 

721 or client._data_available_event is None 

722 or client._futures is None 

723 ): 

724 logger.error( 

725 "LangSmith tracing error: Required compression attributes not " 

726 "initialized.\nThis may affect trace submission but does not " 

727 "impact your application's runtime." 

728 ) 

729 return 

730 

731 batch_ingest_config = _ensure_ingest_config(client.info) 

732 size_limit: int = batch_ingest_config["size_limit"] 

733 size_limit_bytes = client._max_batch_size_bytes or batch_ingest_config.get( 

734 "size_limit_bytes", 20_971_520 

735 ) 

736 # One for this func, one for the parent thread, one for getrefcount, 

737 # one for _get_data_type_cached 

738 num_known_refs = 4 

739 

740 def keep_thread_active() -> bool: 

741 # if `client.cleanup()` was called, stop thread 

742 if not client or ( 

743 hasattr(client, "_manual_cleanup") and client._manual_cleanup 

744 ): 

745 logger.debug("Client is being cleaned up, stopping compression thread") 

746 return False 

747 if not threading.main_thread().is_alive(): 

748 # main thread is dead. should not be active 

749 logger.debug("Main thread is dead, stopping compression thread") 

750 return False 

751 if hasattr(sys, "getrefcount"): 

752 # check if client refs count indicates we're the only remaining 

753 # reference to the client 

754 should_keep_thread = sys.getrefcount(client) > num_known_refs 

755 if not should_keep_thread: 

756 logger.debug( 

757 "Client refs count indicates we're the only remaining reference " 

758 "to the client, stopping compression thread", 

759 ) 

760 return should_keep_thread 

761 else: 

762 # in PyPy, there is no sys.getrefcount attribute 

763 # for now, keep thread alive 

764 return True 

765 

766 last_flush_time = time.monotonic() 

767 

768 while True: 

769 triggered = client._data_available_event.wait(timeout=0.05) 

770 if not keep_thread_active(): 

771 break 

772 

773 # If data arrived, clear the event and attempt a drain 

774 if triggered: 

775 client._data_available_event.clear() 

776 

777 data_stream, compressed_traces_info = ( 

778 _tracing_thread_drain_compressed_buffer 

779 )(client, size_limit, size_limit_bytes) 

780 # If we have data, submit the send request 

781 if data_stream is not None: 

782 try: 

783 future = LANGSMITH_CLIENT_THREAD_POOL.submit( 

784 client._send_compressed_multipart_req, 

785 data_stream, 

786 compressed_traces_info, 

787 ) 

788 client._futures.add(future) 

789 except RuntimeError: 

790 client._send_compressed_multipart_req( 

791 data_stream, 

792 compressed_traces_info, 

793 ) 

794 last_flush_time = time.monotonic() 

795 

796 else: 

797 if (time.monotonic() - last_flush_time) >= flush_interval: 

798 ( 

799 data_stream, 

800 compressed_traces_info, 

801 ) = _tracing_thread_drain_compressed_buffer( 

802 client, size_limit=1, size_limit_bytes=1 

803 ) 

804 if data_stream is not None: 

805 try: 

806 cf.wait( 

807 [ 

808 LANGSMITH_CLIENT_THREAD_POOL.submit( 

809 client._send_compressed_multipart_req, 

810 data_stream, 

811 compressed_traces_info, 

812 ) 

813 ] 

814 ) 

815 except RuntimeError: 

816 client._send_compressed_multipart_req( 

817 data_stream, 

818 compressed_traces_info, 

819 ) 

820 last_flush_time = time.monotonic() 

821 

822 # Drain the buffer on exit (final flush) 

823 try: 

824 ( 

825 final_data_stream, 

826 compressed_traces_info, 

827 ) = _tracing_thread_drain_compressed_buffer( 

828 client, size_limit=1, size_limit_bytes=1 

829 ) 

830 if final_data_stream is not None: 

831 try: 

832 cf.wait( 

833 [ 

834 LANGSMITH_CLIENT_THREAD_POOL.submit( 

835 client._send_compressed_multipart_req, 

836 final_data_stream, 

837 compressed_traces_info, 

838 ) 

839 ] 

840 ) 

841 except RuntimeError: 

842 client._send_compressed_multipart_req( 

843 final_data_stream, 

844 compressed_traces_info, 

845 ) 

846 

847 except Exception: 

848 logger.error( 

849 "LangSmith tracing error: Failed during final cleanup.\n" 

850 "This does not affect your application's runtime.\n" 

851 "Error details:", 

852 exc_info=True, 

853 ) 

854 logger.debug("Compressed traces control thread is shutting down") 

855 

856 

857def _tracing_sub_thread_func( 

858 client_ref: weakref.ref[Client], 

859 use_multipart: bool, 

860) -> None: 

861 client = client_ref() 

862 if client is None: 

863 return 

864 try: 

865 if not client.info: 

866 return 

867 except BaseException as e: 

868 logger.debug("Error in tracing control thread: %s", e) 

869 return 

870 tracing_queue = client.tracing_queue 

871 assert tracing_queue is not None 

872 batch_ingest_config = _ensure_ingest_config(client.info) 

873 size_limit = batch_ingest_config.get("size_limit", 100) 

874 seen_successive_empty_queues = 0 

875 

876 # loop until 

877 while ( 

878 # the main thread dies 

879 threading.main_thread().is_alive() 

880 # or we've seen the queue empty 4 times in a row 

881 and seen_successive_empty_queues 

882 <= batch_ingest_config["scale_down_nempty_trigger"] 

883 ): 

884 max_batch_size = ( 

885 client._max_batch_size_bytes 

886 or batch_ingest_config.get("size_limit_bytes") 

887 or 0 

888 ) 

889 if next_batch := _tracing_thread_drain_queue( 

890 tracing_queue, limit=size_limit, max_size_bytes=max_batch_size 

891 ): 

892 seen_successive_empty_queues = 0 

893 

894 hybrid_otel_and_langsmith, is_otel_only = get_tracing_mode() 

895 if hybrid_otel_and_langsmith: 

896 # Hybrid mode: both OTEL and LangSmith 

897 _hybrid_tracing_thread_handle_batch( 

898 client, tracing_queue, next_batch, use_multipart 

899 ) 

900 elif is_otel_only: 

901 # OTEL-only mode 

902 _otel_tracing_thread_handle_batch(client, tracing_queue, next_batch) 

903 else: 

904 # LangSmith-only mode 

905 _tracing_thread_handle_batch( 

906 client, tracing_queue, next_batch, use_multipart 

907 ) 

908 else: 

909 seen_successive_empty_queues += 1 

910 

911 # drain the queue on exit - apply same logic 

912 hybrid_otel_and_langsmith, is_otel_only = get_tracing_mode() 

913 max_batch_size = ( 

914 client._max_batch_size_bytes or batch_ingest_config.get("size_limit_bytes") or 0 

915 ) 

916 while next_batch := _tracing_thread_drain_queue( 

917 tracing_queue, limit=size_limit, block=False, max_size_bytes=max_batch_size 

918 ): 

919 if hybrid_otel_and_langsmith: 

920 # Hybrid mode cleanup 

921 _hybrid_tracing_thread_handle_batch( 

922 client, tracing_queue, next_batch, use_multipart 

923 ) 

924 elif is_otel_only: 

925 # OTEL-only cleanup 

926 logger.debug("OTEL-only cleanup") 

927 _otel_tracing_thread_handle_batch(client, tracing_queue, next_batch) 

928 else: 

929 # LangSmith-only cleanup 

930 logger.debug("LangSmith-only cleanup") 

931 _tracing_thread_handle_batch( 

932 client, tracing_queue, next_batch, use_multipart 

933 ) 

934 logger.debug("Tracing control sub-thread is shutting down")