Coverage for langsmith/_internal/_operations.py: 22%
196 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1from __future__ import annotations
3import itertools
4import logging
5import os
6import uuid
7from collections.abc import Iterable
8from io import BufferedReader
9from typing import Literal, Optional, Union, cast
11from langsmith import schemas as ls_schemas
12from langsmith._internal import _orjson
13from langsmith._internal._compressed_traces import CompressedTraces
14from langsmith._internal._multipart import MultipartPart, MultipartPartsAndContext
15from langsmith._internal._serde import dumps_json as _dumps_json
17logger = logging.getLogger(__name__)
20class SerializedRunOperation:
21 operation: Literal["post", "patch"]
22 id: uuid.UUID
23 trace_id: uuid.UUID
25 # this is the whole object, minus the other fields which
26 # are popped (inputs/outputs/events/attachments)
27 _none: bytes
29 inputs: Optional[bytes]
30 outputs: Optional[bytes]
31 events: Optional[bytes]
32 extra: Optional[bytes]
33 error: Optional[bytes]
34 serialized: Optional[bytes]
35 attachments: Optional[ls_schemas.Attachments]
37 __slots__ = (
38 "operation",
39 "id",
40 "trace_id",
41 "_none",
42 "inputs",
43 "outputs",
44 "events",
45 "extra",
46 "error",
47 "serialized",
48 "attachments",
49 )
51 def __init__(
52 self,
53 operation: Literal["post", "patch"],
54 id: uuid.UUID,
55 trace_id: uuid.UUID,
56 _none: bytes,
57 inputs: Optional[bytes] = None,
58 outputs: Optional[bytes] = None,
59 events: Optional[bytes] = None,
60 extra: Optional[bytes] = None,
61 error: Optional[bytes] = None,
62 serialized: Optional[bytes] = None,
63 attachments: Optional[ls_schemas.Attachments] = None,
64 ) -> None:
65 self.operation = operation
66 self.id = id
67 self.trace_id = trace_id
68 self._none = _none
69 self.inputs = inputs
70 self.outputs = outputs
71 self.events = events
72 self.extra = extra
73 self.error = error
74 self.serialized = serialized
75 self.attachments = attachments
77 def calculate_serialized_size(self) -> int:
78 """Calculate actual serialized size of this operation."""
79 size = 0
80 if self._none:
81 size += len(self._none)
82 if self.inputs:
83 size += len(self.inputs)
84 if self.outputs:
85 size += len(self.outputs)
86 if self.events:
87 size += len(self.events)
88 if self.extra:
89 size += len(self.extra)
90 if self.error:
91 size += len(self.error)
92 if self.serialized:
93 size += len(self.serialized)
94 if self.attachments:
95 for content_type, data_or_path in self.attachments.values():
96 if isinstance(data_or_path, bytes):
97 size += len(data_or_path)
98 return size
100 def deserialize_run_info(self) -> dict:
101 """Deserialize the main run info (_none and extra, error and serialized)."""
102 run_info = _orjson.loads(self._none)
103 if self.extra is not None:
104 run_info["extra"] = _orjson.loads(self.extra)
106 if self.error is not None:
107 run_info["error"] = _orjson.loads(self.error)
109 if self.serialized is not None:
110 run_info["serialized"] = _orjson.loads(self.serialized)
112 return run_info
114 def __eq__(self, other: object) -> bool:
115 return isinstance(other, SerializedRunOperation) and (
116 self.operation,
117 self.id,
118 self.trace_id,
119 self._none,
120 self.inputs,
121 self.outputs,
122 self.events,
123 self.extra,
124 self.error,
125 self.serialized,
126 self.attachments,
127 ) == (
128 other.operation,
129 other.id,
130 other.trace_id,
131 other._none,
132 other.inputs,
133 other.outputs,
134 other.events,
135 other.extra,
136 other.error,
137 other.serialized,
138 other.attachments,
139 )
142class SerializedFeedbackOperation:
143 id: uuid.UUID
144 trace_id: uuid.UUID
145 feedback: bytes
147 __slots__ = ("id", "trace_id", "feedback")
149 def __init__(self, id: uuid.UUID, trace_id: uuid.UUID, feedback: bytes) -> None:
150 self.id = id
151 self.trace_id = trace_id
152 self.feedback = feedback
154 def calculate_serialized_size(self) -> int:
155 """Calculate actual serialized size of this operation."""
156 return len(self.feedback)
158 def __eq__(self, other: object) -> bool:
159 return isinstance(other, SerializedFeedbackOperation) and (
160 self.id,
161 self.trace_id,
162 self.feedback,
163 ) == (other.id, other.trace_id, other.feedback)
166def serialize_feedback_dict(
167 feedback: Union[ls_schemas.FeedbackCreate, dict],
168) -> SerializedFeedbackOperation:
169 if hasattr(feedback, "dict") and callable(getattr(feedback, "dict")):
170 feedback_create: dict = feedback.dict() # type: ignore
171 else:
172 feedback_create = cast(dict, feedback)
173 if "id" not in feedback_create:
174 feedback_create["id"] = uuid.uuid4()
175 elif isinstance(feedback_create["id"], str):
176 feedback_create["id"] = uuid.UUID(feedback_create["id"])
177 if "trace_id" not in feedback_create:
178 feedback_create["trace_id"] = uuid.uuid4()
179 elif isinstance(feedback_create["trace_id"], str):
180 feedback_create["trace_id"] = uuid.UUID(feedback_create["trace_id"])
182 return SerializedFeedbackOperation(
183 id=feedback_create["id"],
184 trace_id=feedback_create["trace_id"],
185 feedback=_dumps_json(feedback_create),
186 )
189def serialize_run_dict(
190 operation: Literal["post", "patch"], payload: dict
191) -> SerializedRunOperation:
192 inputs = payload.pop("inputs", None)
193 outputs = payload.pop("outputs", None)
194 events = payload.pop("events", None)
195 extra = payload.pop("extra", None)
196 error = payload.pop("error", None)
197 serialized = payload.pop("serialized", None)
198 attachments = payload.pop("attachments", None)
199 return SerializedRunOperation(
200 operation=operation,
201 id=payload["id"],
202 trace_id=payload["trace_id"],
203 _none=_dumps_json(payload),
204 inputs=_dumps_json(inputs) if inputs is not None else None,
205 outputs=_dumps_json(outputs) if outputs is not None else None,
206 events=_dumps_json(events) if events is not None else None,
207 extra=_dumps_json(extra) if extra is not None else None,
208 error=_dumps_json(error) if error is not None else None,
209 serialized=_dumps_json(serialized) if serialized is not None else None,
210 attachments=attachments if attachments is not None else None,
211 )
214def combine_serialized_queue_operations(
215 ops: list[Union[SerializedRunOperation, SerializedFeedbackOperation]],
216) -> list[Union[SerializedRunOperation, SerializedFeedbackOperation]]:
217 create_ops_by_id = {
218 op.id: op
219 for op in ops
220 if isinstance(op, SerializedRunOperation) and op.operation == "post"
221 }
222 passthrough_ops: list[
223 Union[SerializedRunOperation, SerializedFeedbackOperation]
224 ] = []
225 for op in ops:
226 if isinstance(op, SerializedRunOperation):
227 if op.operation == "post":
228 continue
230 # must be patch
232 create_op = create_ops_by_id.get(op.id)
233 if create_op is None:
234 passthrough_ops.append(op)
235 continue
237 if op._none is not None and op._none != create_op._none:
238 # TODO optimize this more - this would currently be slowest
239 # for large payloads
240 create_op_dict = _orjson.loads(create_op._none)
241 op_dict = {
242 k: v for k, v in _orjson.loads(op._none).items() if v is not None
243 }
244 create_op_dict.update(op_dict)
245 create_op._none = _orjson.dumps(create_op_dict)
247 if op.inputs is not None:
248 create_op.inputs = op.inputs
249 if op.outputs is not None:
250 create_op.outputs = op.outputs
251 if op.events is not None:
252 create_op.events = op.events
253 if op.extra is not None:
254 create_op.extra = op.extra
255 if op.error is not None:
256 create_op.error = op.error
257 if op.serialized is not None:
258 create_op.serialized = op.serialized
259 if op.attachments is not None:
260 if create_op.attachments is None:
261 create_op.attachments = {}
262 create_op.attachments.update(op.attachments)
263 else:
264 passthrough_ops.append(op)
265 return list(itertools.chain(create_ops_by_id.values(), passthrough_ops))
268def serialized_feedback_operation_to_multipart_parts_and_context(
269 op: SerializedFeedbackOperation,
270) -> MultipartPartsAndContext:
271 return MultipartPartsAndContext(
272 [
273 (
274 f"feedback.{op.id}",
275 (
276 None,
277 op.feedback,
278 "application/json",
279 {"Content-Length": str(len(op.feedback))},
280 ),
281 )
282 ],
283 f"trace={op.trace_id},id={op.id}",
284 )
287def serialized_run_operation_to_multipart_parts_and_context(
288 op: SerializedRunOperation,
289) -> tuple[MultipartPartsAndContext, dict[str, BufferedReader]]:
290 acc_parts: list[MultipartPart] = []
291 opened_files_dict: dict[str, BufferedReader] = {}
292 # this is main object, minus inputs/outputs/events/attachments
293 acc_parts.append(
294 (
295 f"{op.operation}.{op.id}",
296 (
297 None,
298 op._none,
299 "application/json",
300 {"Content-Length": str(len(op._none))},
301 ),
302 )
303 )
304 for key, value in (
305 ("inputs", op.inputs),
306 ("outputs", op.outputs),
307 ("events", op.events),
308 ("extra", op.extra),
309 ("error", op.error),
310 ("serialized", op.serialized),
311 ):
312 if value is None:
313 continue
314 valb = value
315 acc_parts.append(
316 (
317 f"{op.operation}.{op.id}.{key}",
318 (
319 None,
320 valb,
321 "application/json",
322 {"Content-Length": str(len(valb))},
323 ),
324 ),
325 )
326 if op.attachments:
327 for n, (content_type, data_or_path) in op.attachments.items():
328 if "." in n:
329 logger.warning(
330 f"Skipping logging of attachment '{n}' "
331 f"for run {op.id}:"
332 " Invalid attachment name. Attachment names must not contain"
333 " periods ('.'). Please rename the attachment and try again."
334 )
335 continue
337 if isinstance(data_or_path, bytes):
338 acc_parts.append(
339 (
340 f"attachment.{op.id}.{n}",
341 (
342 None,
343 data_or_path,
344 content_type,
345 {"Content-Length": str(len(data_or_path))},
346 ),
347 )
348 )
349 else:
350 try:
351 file_size = os.path.getsize(data_or_path)
352 file = open(data_or_path, "rb")
353 except FileNotFoundError:
354 logger.warning(
355 "Attachment file not found for run %s: %s", op.id, data_or_path
356 )
357 continue
358 opened_files_dict[str(data_or_path) + str(uuid.uuid4())] = file
359 acc_parts.append(
360 (
361 f"attachment.{op.id}.{n}",
362 (
363 None,
364 file,
365 f"{content_type}; length={file_size}",
366 {},
367 ),
368 )
369 )
370 return (
371 MultipartPartsAndContext(acc_parts, f"trace={op.trace_id},id={op.id}"),
372 opened_files_dict,
373 )
376def encode_multipart_parts_and_context(
377 parts_and_context: MultipartPartsAndContext,
378 boundary: str,
379) -> Iterable[tuple[bytes, Union[bytes, BufferedReader]]]:
380 for part_name, (filename, data, content_type, headers) in parts_and_context.parts:
381 header_parts = [
382 f"--{boundary}\r\n",
383 f'Content-Disposition: form-data; name="{part_name}"',
384 ]
386 if filename:
387 header_parts.append(f'; filename="{filename}"')
389 header_parts.extend(
390 [
391 f"\r\nContent-Type: {content_type}\r\n",
392 *[f"{k}: {v}\r\n" for k, v in headers.items()],
393 "\r\n",
394 ]
395 )
397 yield ("".join(header_parts).encode(), data)
400def compress_multipart_parts_and_context(
401 parts_and_context: MultipartPartsAndContext,
402 compressed_traces: CompressedTraces,
403 boundary: str,
404) -> bool:
405 """Compress multipart parts into the shared compressed buffer.
407 Returns True if the parts were enqueued into the compressed buffer, or False
408 if they were rejected because the configured in-memory size limit would be
409 exceeded.
410 """
411 write = compressed_traces.compressor_writer.write
413 parts: list[tuple[bytes, bytes]] = []
414 op_uncompressed_size = 0
416 for headers, data in encode_multipart_parts_and_context(
417 parts_and_context, boundary
418 ):
419 # Normalise to bytes
420 if not isinstance(data, (bytes, bytearray)):
421 data = (
422 data.read() if isinstance(data, BufferedReader) else str(data).encode()
423 )
425 parts.append((headers, data))
426 op_uncompressed_size += len(data)
428 max_bytes = getattr(compressed_traces, "max_uncompressed_size_bytes", None)
429 if max_bytes is not None and max_bytes > 0:
430 current_size = compressed_traces.uncompressed_size
431 if current_size > 0 and current_size + op_uncompressed_size > max_bytes:
432 logger.warning(
433 "Compressed traces queue size limit (%s bytes) exceeded. "
434 "Dropping trace data with context: %s. "
435 "Current queue size: %s bytes, attempted addition: %s bytes.",
436 max_bytes,
437 parts_and_context.context,
438 current_size,
439 op_uncompressed_size,
440 )
441 return False
443 for headers, data in parts:
444 write(headers)
445 compressed_traces.uncompressed_size += len(data)
446 write(data)
447 write(b"\r\n") # part terminator
449 compressed_traces._context.append(parts_and_context.context)
450 return True