Coverage for langsmith/_internal/_compressed_traces.py: 0%
28 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1import io
2import threading
3from typing import Optional
5from zstandard import ZstdCompressor # type: ignore[import]
7from langsmith import utils as ls_utils
9compression_level = int(ls_utils.get_env_var("RUN_COMPRESSION_LEVEL") or 1)
10compression_threads = int(ls_utils.get_env_var("RUN_COMPRESSION_THREADS") or -1)
12DEFAULT_MAX_UNCOMPRESSED_QUEUE_BYTES = 1024 * 1024 * 1024 # 1GB
15class CompressedTraces:
16 def __init__(self, max_uncompressed_size_bytes: Optional[int] = None) -> None:
17 # Configure the maximum total uncompressed size for the in-memory queue.
18 if max_uncompressed_size_bytes is None:
19 max_bytes_str = ls_utils.get_env_var("MAX_INGEST_MEMORY_BYTES")
20 if max_bytes_str is not None:
21 max_uncompressed_size_bytes = int(max_bytes_str)
22 else:
23 max_uncompressed_size_bytes = DEFAULT_MAX_UNCOMPRESSED_QUEUE_BYTES
25 self.max_uncompressed_size_bytes = max_uncompressed_size_bytes
27 self.buffer: io.BytesIO = io.BytesIO()
28 self.trace_count: int = 0
29 self.lock = threading.Lock()
30 self.uncompressed_size: int = 0
31 self._context: list[str] = []
33 self.compressor_writer = ZstdCompressor(
34 level=compression_level, threads=compression_threads
35 ).stream_writer(self.buffer, closefd=False)
37 def reset(self) -> None:
38 self.buffer = io.BytesIO()
39 self.trace_count = 0
40 self.uncompressed_size = 0
41 self._context = []
42 self.compressor_writer = ZstdCompressor(
43 level=compression_level, threads=-1
44 ).stream_writer(self.buffer, closefd=False)