Coverage for langsmith/_internal/_compressed_traces.py: 0%

28 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1import io 

2import threading 

3from typing import Optional 

4 

5from zstandard import ZstdCompressor # type: ignore[import] 

6 

7from langsmith import utils as ls_utils 

8 

9compression_level = int(ls_utils.get_env_var("RUN_COMPRESSION_LEVEL") or 1) 

10compression_threads = int(ls_utils.get_env_var("RUN_COMPRESSION_THREADS") or -1) 

11 

12DEFAULT_MAX_UNCOMPRESSED_QUEUE_BYTES = 1024 * 1024 * 1024 # 1GB 

13 

14 

15class CompressedTraces: 

16 def __init__(self, max_uncompressed_size_bytes: Optional[int] = None) -> None: 

17 # Configure the maximum total uncompressed size for the in-memory queue. 

18 if max_uncompressed_size_bytes is None: 

19 max_bytes_str = ls_utils.get_env_var("MAX_INGEST_MEMORY_BYTES") 

20 if max_bytes_str is not None: 

21 max_uncompressed_size_bytes = int(max_bytes_str) 

22 else: 

23 max_uncompressed_size_bytes = DEFAULT_MAX_UNCOMPRESSED_QUEUE_BYTES 

24 

25 self.max_uncompressed_size_bytes = max_uncompressed_size_bytes 

26 

27 self.buffer: io.BytesIO = io.BytesIO() 

28 self.trace_count: int = 0 

29 self.lock = threading.Lock() 

30 self.uncompressed_size: int = 0 

31 self._context: list[str] = [] 

32 

33 self.compressor_writer = ZstdCompressor( 

34 level=compression_level, threads=compression_threads 

35 ).stream_writer(self.buffer, closefd=False) 

36 

37 def reset(self) -> None: 

38 self.buffer = io.BytesIO() 

39 self.trace_count = 0 

40 self.uncompressed_size = 0 

41 self._context = [] 

42 self.compressor_writer = ZstdCompressor( 

43 level=compression_level, threads=-1 

44 ).stream_writer(self.buffer, closefd=False)