Coverage for langsmith/_internal/_background_thread.py: 20%
389 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1from __future__ import annotations
3import concurrent.futures as cf
4import copy
5import functools
6import io
7import logging
8import sys
9import threading
10import time
11import weakref
12from multiprocessing import cpu_count
13from queue import Empty, Queue
14from typing import TYPE_CHECKING, Any, Optional, Union, cast
16from langsmith import schemas as ls_schemas
17from langsmith import utils as ls_utils
18from langsmith._internal._compressed_traces import CompressedTraces
19from langsmith._internal._constants import (
20 _AUTO_SCALE_DOWN_NEMPTY_TRIGGER,
21 _AUTO_SCALE_UP_NTHREADS_LIMIT,
22 _AUTO_SCALE_UP_QSIZE_TRIGGER,
23 _BOUNDARY,
24)
25from langsmith._internal._operations import (
26 SerializedFeedbackOperation,
27 SerializedRunOperation,
28 combine_serialized_queue_operations,
29)
31if TYPE_CHECKING:
32 from opentelemetry.context.context import Context # type: ignore[import]
34 from langsmith.client import Client
36logger = logging.getLogger("langsmith.client")
38LANGSMITH_CLIENT_THREAD_POOL = cf.ThreadPoolExecutor(max_workers=cpu_count())
41def _group_batch_by_api_endpoint(
42 batch: list[TracingQueueItem],
43) -> dict[tuple[Optional[str], Optional[str]], list[TracingQueueItem]]:
44 """Group batch items by (api_url, api_key) combination."""
45 from collections import defaultdict
47 grouped = defaultdict(list)
48 for item in batch:
49 key = (item.api_url, item.api_key)
50 grouped[key].append(item)
51 return grouped
54@functools.total_ordering
55class TracingQueueItem:
56 """An item in the tracing queue.
58 Attributes:
59 priority (str): The priority of the item.
60 item (Any): The item itself.
61 otel_context (Optional[Context]): The OTEL context of the item.
62 """
64 priority: str
65 item: Union[SerializedRunOperation, SerializedFeedbackOperation]
66 api_url: Optional[str]
67 api_key: Optional[str]
68 otel_context: Optional[Context]
70 __slots__ = ("priority", "item", "api_key", "api_url", "otel_context")
72 def __init__(
73 self,
74 priority: str,
75 item: Union[SerializedRunOperation, SerializedFeedbackOperation],
76 api_key: Optional[str] = None,
77 api_url: Optional[str] = None,
78 otel_context: Optional[Context] = None,
79 ) -> None:
80 self.priority = priority
81 self.item = item
82 self.api_key = api_key
83 self.api_url = api_url
84 self.otel_context = otel_context
86 def __lt__(self, other: TracingQueueItem) -> bool:
87 return (self.priority, self.item.__class__) < (
88 other.priority,
89 other.item.__class__,
90 )
92 def __eq__(self, other: object) -> bool:
93 return isinstance(other, TracingQueueItem) and (
94 self.priority,
95 self.item.__class__,
96 ) == (other.priority, other.item.__class__)
99def _tracing_thread_drain_queue(
100 tracing_queue: Queue, limit: int = 100, block: bool = True, max_size_bytes: int = 0
101) -> list[TracingQueueItem]:
102 next_batch: list[TracingQueueItem] = []
103 current_size = 0
105 try:
106 # wait 250ms for the first item, then
107 # - drain the queue with a 50ms block timeout
108 # - stop draining if we hit either count or size limit
109 # shorter drain timeout is used instead of non-blocking calls to
110 # avoid creating too many small batches
111 if item := tracing_queue.get(block=block, timeout=0.25):
112 next_batch.append(item)
113 if max_size_bytes > 0:
114 current_size += item.item.calculate_serialized_size()
115 # If first item already exceeds limit, return just this item
116 if current_size > max_size_bytes:
117 return next_batch
119 # Continue draining until we hit count limit OR size limit
120 while True:
121 try:
122 item = tracing_queue.get(block=block, timeout=0.05)
123 except Empty:
124 break
126 # Add the item first
127 next_batch.append(item)
129 # Then check size limit AFTER adding the item
130 if max_size_bytes > 0:
131 current_size += item.item.calculate_serialized_size()
132 # If we've exceeded size limit, stop here
133 # (item is included in this batch)
134 if current_size > max_size_bytes:
135 break
137 # Check count limit AFTER adding the item
138 if limit and len(next_batch) >= limit:
139 break
140 except Empty:
141 pass
142 return next_batch
145def _tracing_thread_drain_compressed_buffer(
146 client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520
147) -> tuple[Optional[io.BytesIO], Optional[tuple[int, int]]]:
148 try:
149 if client.compressed_traces is None:
150 return None, None
151 with client.compressed_traces.lock:
152 pre_compressed_size = client.compressed_traces.uncompressed_size
154 size_limit_bytes = client._max_batch_size_bytes or size_limit_bytes
156 if size_limit is not None and size_limit <= 0:
157 raise ValueError(f"size_limit must be positive; got {size_limit}")
158 if size_limit_bytes is not None and size_limit_bytes < 0:
159 raise ValueError(
160 f"size_limit_bytes must be nonnegative; got {size_limit_bytes}"
161 )
163 if (
164 size_limit_bytes is None or pre_compressed_size < size_limit_bytes
165 ) and (
166 size_limit is None or client.compressed_traces.trace_count < size_limit
167 ):
168 return None, None
170 # Write final boundary and close compression stream
171 client.compressed_traces.compressor_writer.write(
172 f"--{_BOUNDARY}--\r\n".encode()
173 )
174 client.compressed_traces.compressor_writer.close()
175 current_size = client.compressed_traces.buffer.tell()
177 filled_buffer = client.compressed_traces.buffer
178 setattr(
179 cast(Any, filled_buffer),
180 "context",
181 client.compressed_traces._context,
182 )
184 compressed_traces_info = (pre_compressed_size, current_size)
186 client.compressed_traces.reset()
188 filled_buffer.seek(0)
189 return (filled_buffer, compressed_traces_info)
190 except Exception:
191 logger.error(
192 "LangSmith tracing error: Failed to submit trace data.\n"
193 "This does not affect your application's runtime.\n"
194 "Error details:",
195 exc_info=True,
196 )
197 # exceptions are logged elsewhere, but we need to make sure the
198 # background thread continues to run
199 return None, None
202def _process_buffered_run_ops_batch(
203 client: Client, batch_to_process: list[tuple[str, dict]]
204) -> None:
205 """Process a batch of run operations asynchronously."""
206 try:
207 # Extract just the run dictionaries for process_buffered_run_ops
208 run_dicts = [run_data for _, run_data in batch_to_process]
209 original_ids = [run.get("id") for run in run_dicts]
211 # Apply process_buffered_run_ops transformation
212 if client._process_buffered_run_ops is None:
213 raise RuntimeError(
214 "process_buffered_run_ops should not be None when processing batch"
215 )
216 processed_runs = list(client._process_buffered_run_ops(run_dicts))
218 # Validate that the transformation preserves run count and IDs
219 if len(processed_runs) != len(run_dicts):
220 raise ValueError(
221 f"process_buffered_run_ops must return the same number of runs. "
222 f"Expected {len(run_dicts)}, got {len(processed_runs)}"
223 )
225 processed_ids = [run.get("id") for run in processed_runs]
226 if processed_ids != original_ids:
227 raise ValueError(
228 f"process_buffered_run_ops must preserve run IDs in the same order. "
229 f"Expected {original_ids}, got {processed_ids}"
230 )
232 # Process each run and add to compressed traces
233 for (operation, _), processed_run in zip(batch_to_process, processed_runs):
234 if operation == "post":
235 client._create_run(processed_run)
236 elif operation == "patch":
237 client._update_run(processed_run)
239 # Trigger data available event
240 if client._data_available_event:
241 client._data_available_event.set()
242 except Exception:
243 # Log errors but don't crash the background thread
244 logger.error(
245 "LangSmith buffered run ops processing error: Failed to process batch.\n"
246 "This does not affect your application's runtime.\n"
247 "Error details:",
248 exc_info=True,
249 )
252def _tracing_thread_handle_batch(
253 client: Client,
254 tracing_queue: Queue,
255 batch: list[TracingQueueItem],
256 use_multipart: bool,
257 mark_task_done: bool = True,
258 ops: Optional[
259 list[Union[SerializedRunOperation, SerializedFeedbackOperation]]
260 ] = None,
261) -> None:
262 """Handle a batch of tracing queue items by sending them to LangSmith.
264 Args:
265 client: The LangSmith client to use for sending data.
266 tracing_queue: The queue containing tracing items (used for task_done calls).
267 batch: List of tracing queue items to process.
268 use_multipart: Whether to use multipart endpoint for sending data.
269 mark_task_done: Whether to mark queue tasks as done after processing.
270 Set to False when called from parallel execution to avoid double counting.
271 ops: Pre-combined serialized operations to use instead of combining from batch.
272 If None, operations will be combined from the batch items.
273 """
274 try:
275 # Group batch items by (api_url, api_key) combination
276 grouped_batches = _group_batch_by_api_endpoint(batch)
278 for (api_url, api_key), group_batch in grouped_batches.items():
279 if not ops:
280 group_ops = combine_serialized_queue_operations(
281 [item.item for item in group_batch]
282 )
283 else:
284 group_ids = {item.item.id for item in group_batch}
285 group_ops = [op for op in ops if op.id in group_ids]
287 if use_multipart:
288 client._multipart_ingest_ops(
289 group_ops, api_url=api_url, api_key=api_key
290 )
291 else:
292 if any(isinstance(op, SerializedFeedbackOperation) for op in group_ops):
293 logger.warning(
294 "Feedback operations are not supported in non-multipart mode"
295 )
296 group_ops = [
297 op
298 for op in group_ops
299 if not isinstance(op, SerializedFeedbackOperation)
300 ]
301 client._batch_ingest_run_ops(
302 cast(list[SerializedRunOperation], group_ops),
303 api_url=api_url,
304 api_key=api_key,
305 )
307 except Exception as e:
308 logger.error(
309 "LangSmith tracing error: Failed to submit trace data.\n"
310 "This does not affect your application's runtime.\n"
311 "Error details:",
312 exc_info=True,
313 )
314 client._invoke_tracing_error_callback(e)
315 finally:
316 if mark_task_done and tracing_queue is not None:
317 for _ in batch:
318 try:
319 tracing_queue.task_done()
320 except ValueError as e:
321 if "task_done() called too many times" in str(e):
322 # This can happen during shutdown when multiple threads
323 # process the same queue items. It's harmless.
324 logger.debug(
325 f"Ignoring harmless task_done error during shutdown: {e}"
326 )
327 else:
328 raise
331def _otel_tracing_thread_handle_batch(
332 client: Client,
333 tracing_queue: Queue,
334 batch: list[TracingQueueItem],
335 mark_task_done: bool = True,
336 ops: Optional[
337 list[Union[SerializedRunOperation, SerializedFeedbackOperation]]
338 ] = None,
339) -> None:
340 """Handle a batch of tracing queue items by exporting them to OTEL.
342 Args:
343 client: The LangSmith client containing the OTEL exporter.
344 tracing_queue: The queue containing tracing items (used for task_done calls).
345 batch: List of tracing queue items to process.
346 mark_task_done: Whether to mark queue tasks as done after processing.
347 Set to False when called from parallel execution to avoid double counting.
348 ops: Pre-combined serialized operations to use instead of combining from batch.
349 If None, operations will be combined from the batch items.
350 """
351 try:
352 if ops is None:
353 ops = combine_serialized_queue_operations([item.item for item in batch])
355 run_ops = [op for op in ops if isinstance(op, SerializedRunOperation)]
356 otel_context_map = {
357 item.item.id: item.otel_context
358 for item in batch
359 if isinstance(item.item, SerializedRunOperation)
360 }
361 if run_ops:
362 if client.otel_exporter is not None:
363 client.otel_exporter.export_batch(run_ops, otel_context_map)
364 else:
365 logger.error(
366 "LangSmith tracing error: Failed to submit OTEL trace data.\n"
367 "This does not affect your application's runtime.\n"
368 "Error details: client.otel_exporter is None"
369 )
371 except Exception as e:
372 logger.error(
373 "OTEL tracing error: Failed to submit trace data.\n"
374 "This does not affect your application's runtime.\n"
375 "Error details:",
376 exc_info=True,
377 )
378 client._invoke_tracing_error_callback(e)
379 finally:
380 if mark_task_done and tracing_queue is not None:
381 for _ in batch:
382 try:
383 tracing_queue.task_done()
384 except ValueError as e:
385 if "task_done() called too many times" in str(e):
386 # This can happen during shutdown when multiple threads
387 # process the same queue items. It's harmless.
388 logger.debug(
389 f"Ignoring harmless task_done error during shutdown: {e}"
390 )
391 else:
392 raise
395def _hybrid_tracing_thread_handle_batch(
396 client: Client,
397 tracing_queue: Queue,
398 batch: list[TracingQueueItem],
399 use_multipart: bool,
400 mark_task_done: bool = True,
401) -> None:
402 """Handle a batch of tracing queue items by sending to both both LangSmith and OTEL.
404 Args:
405 client: The LangSmith client to use for sending data.
406 tracing_queue: The queue containing tracing items (used for task_done calls).
407 batch: List of tracing queue items to process.
408 use_multipart: Whether to use multipart endpoint for LangSmith.
409 mark_task_done: Whether to mark queue tasks as done after processing.
410 Set to False primarily for testing when items weren't actually queued.
411 """
412 # Combine operations once to avoid race conditions
413 ops = combine_serialized_queue_operations([item.item for item in batch])
415 # Create copies for each thread to avoid shared mutation
416 langsmith_ops = copy.deepcopy(ops)
417 otel_ops = copy.deepcopy(ops)
419 try:
420 # Use ThreadPoolExecutor for parallel execution
421 with cf.ThreadPoolExecutor(max_workers=2) as executor:
422 # Submit both tasks
423 future_langsmith = executor.submit(
424 _tracing_thread_handle_batch,
425 client,
426 tracing_queue,
427 batch,
428 use_multipart,
429 False, # Don't mark tasks done - we'll do it once at the end
430 langsmith_ops,
431 )
432 future_otel = executor.submit(
433 _otel_tracing_thread_handle_batch,
434 client,
435 tracing_queue,
436 batch,
437 False, # Don't mark tasks done - we'll do it once at the end
438 otel_ops,
439 )
441 # Wait for both to complete
442 future_langsmith.result()
443 future_otel.result()
444 except RuntimeError as e:
445 if "cannot schedule new futures after interpreter shutdown" in str(e):
446 # During interpreter shutdown, ThreadPoolExecutor is blocked,
447 # fall back to sequential processing
448 logger.debug(
449 "Interpreter shutting down, falling back to sequential processing"
450 )
451 _tracing_thread_handle_batch(
452 client, tracing_queue, batch, use_multipart, False, langsmith_ops
453 )
454 _otel_tracing_thread_handle_batch(
455 client, tracing_queue, batch, False, otel_ops
456 )
457 else:
458 raise
460 # Mark all tasks as done once, only if requested
461 if mark_task_done and tracing_queue is not None:
462 for _ in batch:
463 try:
464 tracing_queue.task_done()
465 except ValueError as e:
466 if "task_done() called too many times" in str(e):
467 # This can happen during shutdown when multiple threads
468 # process the same queue items. It's harmless.
469 logger.debug(
470 f"Ignoring harmless task_done error during shutdown: {e}"
471 )
472 else:
473 raise
476def _is_using_internal_otlp_provider(client: Client) -> bool:
477 """Check if client is using LangSmith's internal OTLP provider.
479 Returns True if using LangSmith's internal provider, False if user
480 provided their own.
481 """
482 if not hasattr(client, "otel_exporter") or client.otel_exporter is None:
483 return False
485 try:
486 # Use OpenTelemetry's standard API to get the global TracerProvider
487 # Check if OTEL is available
488 if not ls_utils.is_env_var_truish("OTEL_ENABLED"):
489 return False
491 # Get the global TracerProvider and check its resource attributes
492 from opentelemetry import trace # type: ignore[import]
494 tracer_provider = trace.get_tracer_provider()
495 if hasattr(tracer_provider, "resource") and hasattr(
496 tracer_provider.resource, "attributes"
497 ):
498 is_internal = tracer_provider.resource.attributes.get(
499 "langsmith.internal_provider", False
500 )
501 logger.debug(
502 f"TracerProvider resource check: "
503 f"langsmith.internal_provider={is_internal}"
504 )
505 return is_internal
507 return False
508 except Exception as e:
509 logger.debug(
510 f"Could not determine TracerProvider type: {e}, assuming user-provided"
511 )
512 return False
515def get_size_limit_from_env() -> Optional[int]:
516 size_limit_str = ls_utils.get_env_var(
517 "BATCH_INGEST_SIZE_LIMIT",
518 )
519 if size_limit_str is not None:
520 try:
521 return int(size_limit_str)
522 except ValueError:
523 logger.warning(
524 f"Invalid value for BATCH_INGEST_SIZE_LIMIT: {size_limit_str}, "
525 "continuing with default"
526 )
527 return None
530def _ensure_ingest_config(
531 info: ls_schemas.LangSmithInfo,
532) -> ls_schemas.BatchIngestConfig:
533 default_config = ls_schemas.BatchIngestConfig(
534 use_multipart_endpoint=False,
535 size_limit_bytes=None, # Note this field is not used here
536 size_limit=100,
537 scale_up_nthreads_limit=_AUTO_SCALE_UP_NTHREADS_LIMIT,
538 scale_up_qsize_trigger=_AUTO_SCALE_UP_QSIZE_TRIGGER,
539 scale_down_nempty_trigger=_AUTO_SCALE_DOWN_NEMPTY_TRIGGER,
540 )
541 if not info:
542 return default_config
543 try:
544 if not info.batch_ingest_config:
545 return default_config
546 env_size_limit = get_size_limit_from_env()
547 if env_size_limit is not None:
548 info.batch_ingest_config["size_limit"] = env_size_limit
549 return info.batch_ingest_config
550 except BaseException:
551 return default_config
554def get_tracing_mode() -> tuple[bool, bool]:
555 """Get the current tracing mode configuration.
557 Returns:
558 tuple[bool, bool]:
559 - hybrid_otel_and_langsmith: True if both OTEL and LangSmith tracing
560 are enabled, which is default behavior if OTEL_ENABLED is set to
561 true and OTEL_ONLY is not set to true
562 - is_otel_only: True if only OTEL tracing is enabled
563 """
564 otel_enabled = ls_utils.is_env_var_truish("OTEL_ENABLED")
565 otel_only = ls_utils.is_env_var_truish("OTEL_ONLY")
567 # If OTEL is not enabled, neither mode should be active
568 if not otel_enabled:
569 return False, False
571 hybrid_otel_and_langsmith = not otel_only
572 is_otel_only = otel_only
574 return hybrid_otel_and_langsmith, is_otel_only
577def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None:
578 client = client_ref()
579 if client is None:
580 return
581 tracing_queue = client.tracing_queue
582 assert tracing_queue is not None
583 batch_ingest_config = _ensure_ingest_config(client.info)
584 size_limit: int = batch_ingest_config["size_limit"]
585 scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"]
586 scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"]
587 use_multipart = batch_ingest_config.get("use_multipart_endpoint", False)
589 sub_threads: list[threading.Thread] = []
590 # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached
591 num_known_refs = 3
593 # Disable compression if explicitly set or if using OpenTelemetry
594 disable_compression = (
595 ls_utils.is_env_var_truish("DISABLE_RUN_COMPRESSION")
596 or client.otel_exporter is not None
597 )
598 if not disable_compression and use_multipart:
599 if not (client.info.instance_flags or {}).get(
600 "zstd_compression_enabled", False
601 ):
602 logger.warning(
603 "Run compression is not enabled. Please update to the latest "
604 "version of LangSmith. Falling back to regular multipart ingestion."
605 )
606 else:
607 client._futures = weakref.WeakSet()
608 client.compressed_traces = CompressedTraces()
609 client._data_available_event = threading.Event()
610 threading.Thread(
611 target=tracing_control_thread_func_compress_parallel,
612 args=(weakref.ref(client),),
613 ).start()
615 num_known_refs += 1
617 def keep_thread_active() -> bool:
618 # if `client.cleanup()` was called, stop thread
619 if not client or (
620 hasattr(client, "_manual_cleanup") and client._manual_cleanup
621 ):
622 logger.debug("Client is being cleaned up, stopping tracing thread")
623 return False
624 if not threading.main_thread().is_alive():
625 # main thread is dead. should not be active
626 logger.debug("Main thread is dead, stopping tracing thread")
627 return False
629 if hasattr(sys, "getrefcount"):
630 # check if client refs count indicates we're the only remaining
631 # reference to the client
632 should_keep_thread = sys.getrefcount(client) > num_known_refs + len(
633 sub_threads
634 )
635 if not should_keep_thread:
636 logger.debug(
637 "Client refs count indicates we're the only remaining reference "
638 "to the client, stopping tracing thread",
639 )
640 return should_keep_thread
641 else:
642 # in PyPy, there is no sys.getrefcount attribute
643 # for now, keep thread alive
644 return True
646 # loop until
647 while keep_thread_active():
648 for thread in sub_threads:
649 if not thread.is_alive():
650 sub_threads.remove(thread)
651 if (
652 len(sub_threads) < scale_up_nthreads_limit
653 and tracing_queue.qsize() > scale_up_qsize_trigger
654 ):
655 new_thread = threading.Thread(
656 target=_tracing_sub_thread_func,
657 args=(weakref.ref(client), use_multipart),
658 )
659 sub_threads.append(new_thread)
660 new_thread.start()
662 hybrid_otel_and_langsmith, is_otel_only = get_tracing_mode()
663 max_batch_size = (
664 client._max_batch_size_bytes
665 or batch_ingest_config.get("size_limit_bytes")
666 or 0
667 )
668 if next_batch := _tracing_thread_drain_queue(
669 tracing_queue, limit=size_limit, max_size_bytes=max_batch_size
670 ):
671 if hybrid_otel_and_langsmith:
672 # Hybrid mode: both OTEL and LangSmith
673 _hybrid_tracing_thread_handle_batch(
674 client, tracing_queue, next_batch, use_multipart
675 )
676 elif is_otel_only:
677 # OTEL-only mode
678 _otel_tracing_thread_handle_batch(client, tracing_queue, next_batch)
679 else:
680 # LangSmith-only mode
681 _tracing_thread_handle_batch(
682 client, tracing_queue, next_batch, use_multipart
683 )
685 # drain the queue on exit - apply same logic
686 hybrid_otel_and_langsmith, is_otel_only = get_tracing_mode()
687 max_batch_size = (
688 client._max_batch_size_bytes or batch_ingest_config.get("size_limit_bytes") or 0
689 )
690 while next_batch := _tracing_thread_drain_queue(
691 tracing_queue, limit=size_limit, block=False, max_size_bytes=max_batch_size
692 ):
693 if hybrid_otel_and_langsmith:
694 # Hybrid mode cleanup
695 logger.debug("Hybrid mode cleanup")
696 _hybrid_tracing_thread_handle_batch(
697 client, tracing_queue, next_batch, use_multipart
698 )
699 elif is_otel_only:
700 # OTEL-only cleanup
701 logger.debug("OTEL-only cleanup")
702 _otel_tracing_thread_handle_batch(client, tracing_queue, next_batch)
703 else:
704 # LangSmith-only cleanup
705 logger.debug("LangSmith-only cleanup")
706 _tracing_thread_handle_batch(
707 client, tracing_queue, next_batch, use_multipart
708 )
709 logger.debug("Tracing control thread is shutting down")
712def tracing_control_thread_func_compress_parallel(
713 client_ref: weakref.ref[Client], flush_interval: float = 0.5
714) -> None:
715 client = client_ref()
716 if client is None:
717 return
718 logger.debug("Tracing control thread func compress parallel called")
719 if (
720 client.compressed_traces is None
721 or client._data_available_event is None
722 or client._futures is None
723 ):
724 logger.error(
725 "LangSmith tracing error: Required compression attributes not "
726 "initialized.\nThis may affect trace submission but does not "
727 "impact your application's runtime."
728 )
729 return
731 batch_ingest_config = _ensure_ingest_config(client.info)
732 size_limit: int = batch_ingest_config["size_limit"]
733 size_limit_bytes = client._max_batch_size_bytes or batch_ingest_config.get(
734 "size_limit_bytes", 20_971_520
735 )
736 # One for this func, one for the parent thread, one for getrefcount,
737 # one for _get_data_type_cached
738 num_known_refs = 4
740 def keep_thread_active() -> bool:
741 # if `client.cleanup()` was called, stop thread
742 if not client or (
743 hasattr(client, "_manual_cleanup") and client._manual_cleanup
744 ):
745 logger.debug("Client is being cleaned up, stopping compression thread")
746 return False
747 if not threading.main_thread().is_alive():
748 # main thread is dead. should not be active
749 logger.debug("Main thread is dead, stopping compression thread")
750 return False
751 if hasattr(sys, "getrefcount"):
752 # check if client refs count indicates we're the only remaining
753 # reference to the client
754 should_keep_thread = sys.getrefcount(client) > num_known_refs
755 if not should_keep_thread:
756 logger.debug(
757 "Client refs count indicates we're the only remaining reference "
758 "to the client, stopping compression thread",
759 )
760 return should_keep_thread
761 else:
762 # in PyPy, there is no sys.getrefcount attribute
763 # for now, keep thread alive
764 return True
766 last_flush_time = time.monotonic()
768 while True:
769 triggered = client._data_available_event.wait(timeout=0.05)
770 if not keep_thread_active():
771 break
773 # If data arrived, clear the event and attempt a drain
774 if triggered:
775 client._data_available_event.clear()
777 data_stream, compressed_traces_info = (
778 _tracing_thread_drain_compressed_buffer
779 )(client, size_limit, size_limit_bytes)
780 # If we have data, submit the send request
781 if data_stream is not None:
782 try:
783 future = LANGSMITH_CLIENT_THREAD_POOL.submit(
784 client._send_compressed_multipart_req,
785 data_stream,
786 compressed_traces_info,
787 )
788 client._futures.add(future)
789 except RuntimeError:
790 client._send_compressed_multipart_req(
791 data_stream,
792 compressed_traces_info,
793 )
794 last_flush_time = time.monotonic()
796 else:
797 if (time.monotonic() - last_flush_time) >= flush_interval:
798 (
799 data_stream,
800 compressed_traces_info,
801 ) = _tracing_thread_drain_compressed_buffer(
802 client, size_limit=1, size_limit_bytes=1
803 )
804 if data_stream is not None:
805 try:
806 cf.wait(
807 [
808 LANGSMITH_CLIENT_THREAD_POOL.submit(
809 client._send_compressed_multipart_req,
810 data_stream,
811 compressed_traces_info,
812 )
813 ]
814 )
815 except RuntimeError:
816 client._send_compressed_multipart_req(
817 data_stream,
818 compressed_traces_info,
819 )
820 last_flush_time = time.monotonic()
822 # Drain the buffer on exit (final flush)
823 try:
824 (
825 final_data_stream,
826 compressed_traces_info,
827 ) = _tracing_thread_drain_compressed_buffer(
828 client, size_limit=1, size_limit_bytes=1
829 )
830 if final_data_stream is not None:
831 try:
832 cf.wait(
833 [
834 LANGSMITH_CLIENT_THREAD_POOL.submit(
835 client._send_compressed_multipart_req,
836 final_data_stream,
837 compressed_traces_info,
838 )
839 ]
840 )
841 except RuntimeError:
842 client._send_compressed_multipart_req(
843 final_data_stream,
844 compressed_traces_info,
845 )
847 except Exception:
848 logger.error(
849 "LangSmith tracing error: Failed during final cleanup.\n"
850 "This does not affect your application's runtime.\n"
851 "Error details:",
852 exc_info=True,
853 )
854 logger.debug("Compressed traces control thread is shutting down")
857def _tracing_sub_thread_func(
858 client_ref: weakref.ref[Client],
859 use_multipart: bool,
860) -> None:
861 client = client_ref()
862 if client is None:
863 return
864 try:
865 if not client.info:
866 return
867 except BaseException as e:
868 logger.debug("Error in tracing control thread: %s", e)
869 return
870 tracing_queue = client.tracing_queue
871 assert tracing_queue is not None
872 batch_ingest_config = _ensure_ingest_config(client.info)
873 size_limit = batch_ingest_config.get("size_limit", 100)
874 seen_successive_empty_queues = 0
876 # loop until
877 while (
878 # the main thread dies
879 threading.main_thread().is_alive()
880 # or we've seen the queue empty 4 times in a row
881 and seen_successive_empty_queues
882 <= batch_ingest_config["scale_down_nempty_trigger"]
883 ):
884 max_batch_size = (
885 client._max_batch_size_bytes
886 or batch_ingest_config.get("size_limit_bytes")
887 or 0
888 )
889 if next_batch := _tracing_thread_drain_queue(
890 tracing_queue, limit=size_limit, max_size_bytes=max_batch_size
891 ):
892 seen_successive_empty_queues = 0
894 hybrid_otel_and_langsmith, is_otel_only = get_tracing_mode()
895 if hybrid_otel_and_langsmith:
896 # Hybrid mode: both OTEL and LangSmith
897 _hybrid_tracing_thread_handle_batch(
898 client, tracing_queue, next_batch, use_multipart
899 )
900 elif is_otel_only:
901 # OTEL-only mode
902 _otel_tracing_thread_handle_batch(client, tracing_queue, next_batch)
903 else:
904 # LangSmith-only mode
905 _tracing_thread_handle_batch(
906 client, tracing_queue, next_batch, use_multipart
907 )
908 else:
909 seen_successive_empty_queues += 1
911 # drain the queue on exit - apply same logic
912 hybrid_otel_and_langsmith, is_otel_only = get_tracing_mode()
913 max_batch_size = (
914 client._max_batch_size_bytes or batch_ingest_config.get("size_limit_bytes") or 0
915 )
916 while next_batch := _tracing_thread_drain_queue(
917 tracing_queue, limit=size_limit, block=False, max_size_bytes=max_batch_size
918 ):
919 if hybrid_otel_and_langsmith:
920 # Hybrid mode cleanup
921 _hybrid_tracing_thread_handle_batch(
922 client, tracing_queue, next_batch, use_multipart
923 )
924 elif is_otel_only:
925 # OTEL-only cleanup
926 logger.debug("OTEL-only cleanup")
927 _otel_tracing_thread_handle_batch(client, tracing_queue, next_batch)
928 else:
929 # LangSmith-only cleanup
930 logger.debug("LangSmith-only cleanup")
931 _tracing_thread_handle_batch(
932 client, tracing_queue, next_batch, use_multipart
933 )
934 logger.debug("Tracing control sub-thread is shutting down")