Coverage for langsmith/_internal/_operations.py: 22%

196 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1from __future__ import annotations 

2 

3import itertools 

4import logging 

5import os 

6import uuid 

7from collections.abc import Iterable 

8from io import BufferedReader 

9from typing import Literal, Optional, Union, cast 

10 

11from langsmith import schemas as ls_schemas 

12from langsmith._internal import _orjson 

13from langsmith._internal._compressed_traces import CompressedTraces 

14from langsmith._internal._multipart import MultipartPart, MultipartPartsAndContext 

15from langsmith._internal._serde import dumps_json as _dumps_json 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class SerializedRunOperation: 

21 operation: Literal["post", "patch"] 

22 id: uuid.UUID 

23 trace_id: uuid.UUID 

24 

25 # this is the whole object, minus the other fields which 

26 # are popped (inputs/outputs/events/attachments) 

27 _none: bytes 

28 

29 inputs: Optional[bytes] 

30 outputs: Optional[bytes] 

31 events: Optional[bytes] 

32 extra: Optional[bytes] 

33 error: Optional[bytes] 

34 serialized: Optional[bytes] 

35 attachments: Optional[ls_schemas.Attachments] 

36 

37 __slots__ = ( 

38 "operation", 

39 "id", 

40 "trace_id", 

41 "_none", 

42 "inputs", 

43 "outputs", 

44 "events", 

45 "extra", 

46 "error", 

47 "serialized", 

48 "attachments", 

49 ) 

50 

51 def __init__( 

52 self, 

53 operation: Literal["post", "patch"], 

54 id: uuid.UUID, 

55 trace_id: uuid.UUID, 

56 _none: bytes, 

57 inputs: Optional[bytes] = None, 

58 outputs: Optional[bytes] = None, 

59 events: Optional[bytes] = None, 

60 extra: Optional[bytes] = None, 

61 error: Optional[bytes] = None, 

62 serialized: Optional[bytes] = None, 

63 attachments: Optional[ls_schemas.Attachments] = None, 

64 ) -> None: 

65 self.operation = operation 

66 self.id = id 

67 self.trace_id = trace_id 

68 self._none = _none 

69 self.inputs = inputs 

70 self.outputs = outputs 

71 self.events = events 

72 self.extra = extra 

73 self.error = error 

74 self.serialized = serialized 

75 self.attachments = attachments 

76 

77 def calculate_serialized_size(self) -> int: 

78 """Calculate actual serialized size of this operation.""" 

79 size = 0 

80 if self._none: 

81 size += len(self._none) 

82 if self.inputs: 

83 size += len(self.inputs) 

84 if self.outputs: 

85 size += len(self.outputs) 

86 if self.events: 

87 size += len(self.events) 

88 if self.extra: 

89 size += len(self.extra) 

90 if self.error: 

91 size += len(self.error) 

92 if self.serialized: 

93 size += len(self.serialized) 

94 if self.attachments: 

95 for content_type, data_or_path in self.attachments.values(): 

96 if isinstance(data_or_path, bytes): 

97 size += len(data_or_path) 

98 return size 

99 

100 def deserialize_run_info(self) -> dict: 

101 """Deserialize the main run info (_none and extra, error and serialized).""" 

102 run_info = _orjson.loads(self._none) 

103 if self.extra is not None: 

104 run_info["extra"] = _orjson.loads(self.extra) 

105 

106 if self.error is not None: 

107 run_info["error"] = _orjson.loads(self.error) 

108 

109 if self.serialized is not None: 

110 run_info["serialized"] = _orjson.loads(self.serialized) 

111 

112 return run_info 

113 

114 def __eq__(self, other: object) -> bool: 

115 return isinstance(other, SerializedRunOperation) and ( 

116 self.operation, 

117 self.id, 

118 self.trace_id, 

119 self._none, 

120 self.inputs, 

121 self.outputs, 

122 self.events, 

123 self.extra, 

124 self.error, 

125 self.serialized, 

126 self.attachments, 

127 ) == ( 

128 other.operation, 

129 other.id, 

130 other.trace_id, 

131 other._none, 

132 other.inputs, 

133 other.outputs, 

134 other.events, 

135 other.extra, 

136 other.error, 

137 other.serialized, 

138 other.attachments, 

139 ) 

140 

141 

142class SerializedFeedbackOperation: 

143 id: uuid.UUID 

144 trace_id: uuid.UUID 

145 feedback: bytes 

146 

147 __slots__ = ("id", "trace_id", "feedback") 

148 

149 def __init__(self, id: uuid.UUID, trace_id: uuid.UUID, feedback: bytes) -> None: 

150 self.id = id 

151 self.trace_id = trace_id 

152 self.feedback = feedback 

153 

154 def calculate_serialized_size(self) -> int: 

155 """Calculate actual serialized size of this operation.""" 

156 return len(self.feedback) 

157 

158 def __eq__(self, other: object) -> bool: 

159 return isinstance(other, SerializedFeedbackOperation) and ( 

160 self.id, 

161 self.trace_id, 

162 self.feedback, 

163 ) == (other.id, other.trace_id, other.feedback) 

164 

165 

166def serialize_feedback_dict( 

167 feedback: Union[ls_schemas.FeedbackCreate, dict], 

168) -> SerializedFeedbackOperation: 

169 if hasattr(feedback, "dict") and callable(getattr(feedback, "dict")): 

170 feedback_create: dict = feedback.dict() # type: ignore 

171 else: 

172 feedback_create = cast(dict, feedback) 

173 if "id" not in feedback_create: 

174 feedback_create["id"] = uuid.uuid4() 

175 elif isinstance(feedback_create["id"], str): 

176 feedback_create["id"] = uuid.UUID(feedback_create["id"]) 

177 if "trace_id" not in feedback_create: 

178 feedback_create["trace_id"] = uuid.uuid4() 

179 elif isinstance(feedback_create["trace_id"], str): 

180 feedback_create["trace_id"] = uuid.UUID(feedback_create["trace_id"]) 

181 

182 return SerializedFeedbackOperation( 

183 id=feedback_create["id"], 

184 trace_id=feedback_create["trace_id"], 

185 feedback=_dumps_json(feedback_create), 

186 ) 

187 

188 

189def serialize_run_dict( 

190 operation: Literal["post", "patch"], payload: dict 

191) -> SerializedRunOperation: 

192 inputs = payload.pop("inputs", None) 

193 outputs = payload.pop("outputs", None) 

194 events = payload.pop("events", None) 

195 extra = payload.pop("extra", None) 

196 error = payload.pop("error", None) 

197 serialized = payload.pop("serialized", None) 

198 attachments = payload.pop("attachments", None) 

199 return SerializedRunOperation( 

200 operation=operation, 

201 id=payload["id"], 

202 trace_id=payload["trace_id"], 

203 _none=_dumps_json(payload), 

204 inputs=_dumps_json(inputs) if inputs is not None else None, 

205 outputs=_dumps_json(outputs) if outputs is not None else None, 

206 events=_dumps_json(events) if events is not None else None, 

207 extra=_dumps_json(extra) if extra is not None else None, 

208 error=_dumps_json(error) if error is not None else None, 

209 serialized=_dumps_json(serialized) if serialized is not None else None, 

210 attachments=attachments if attachments is not None else None, 

211 ) 

212 

213 

214def combine_serialized_queue_operations( 

215 ops: list[Union[SerializedRunOperation, SerializedFeedbackOperation]], 

216) -> list[Union[SerializedRunOperation, SerializedFeedbackOperation]]: 

217 create_ops_by_id = { 

218 op.id: op 

219 for op in ops 

220 if isinstance(op, SerializedRunOperation) and op.operation == "post" 

221 } 

222 passthrough_ops: list[ 

223 Union[SerializedRunOperation, SerializedFeedbackOperation] 

224 ] = [] 

225 for op in ops: 

226 if isinstance(op, SerializedRunOperation): 

227 if op.operation == "post": 

228 continue 

229 

230 # must be patch 

231 

232 create_op = create_ops_by_id.get(op.id) 

233 if create_op is None: 

234 passthrough_ops.append(op) 

235 continue 

236 

237 if op._none is not None and op._none != create_op._none: 

238 # TODO optimize this more - this would currently be slowest 

239 # for large payloads 

240 create_op_dict = _orjson.loads(create_op._none) 

241 op_dict = { 

242 k: v for k, v in _orjson.loads(op._none).items() if v is not None 

243 } 

244 create_op_dict.update(op_dict) 

245 create_op._none = _orjson.dumps(create_op_dict) 

246 

247 if op.inputs is not None: 

248 create_op.inputs = op.inputs 

249 if op.outputs is not None: 

250 create_op.outputs = op.outputs 

251 if op.events is not None: 

252 create_op.events = op.events 

253 if op.extra is not None: 

254 create_op.extra = op.extra 

255 if op.error is not None: 

256 create_op.error = op.error 

257 if op.serialized is not None: 

258 create_op.serialized = op.serialized 

259 if op.attachments is not None: 

260 if create_op.attachments is None: 

261 create_op.attachments = {} 

262 create_op.attachments.update(op.attachments) 

263 else: 

264 passthrough_ops.append(op) 

265 return list(itertools.chain(create_ops_by_id.values(), passthrough_ops)) 

266 

267 

268def serialized_feedback_operation_to_multipart_parts_and_context( 

269 op: SerializedFeedbackOperation, 

270) -> MultipartPartsAndContext: 

271 return MultipartPartsAndContext( 

272 [ 

273 ( 

274 f"feedback.{op.id}", 

275 ( 

276 None, 

277 op.feedback, 

278 "application/json", 

279 {"Content-Length": str(len(op.feedback))}, 

280 ), 

281 ) 

282 ], 

283 f"trace={op.trace_id},id={op.id}", 

284 ) 

285 

286 

287def serialized_run_operation_to_multipart_parts_and_context( 

288 op: SerializedRunOperation, 

289) -> tuple[MultipartPartsAndContext, dict[str, BufferedReader]]: 

290 acc_parts: list[MultipartPart] = [] 

291 opened_files_dict: dict[str, BufferedReader] = {} 

292 # this is main object, minus inputs/outputs/events/attachments 

293 acc_parts.append( 

294 ( 

295 f"{op.operation}.{op.id}", 

296 ( 

297 None, 

298 op._none, 

299 "application/json", 

300 {"Content-Length": str(len(op._none))}, 

301 ), 

302 ) 

303 ) 

304 for key, value in ( 

305 ("inputs", op.inputs), 

306 ("outputs", op.outputs), 

307 ("events", op.events), 

308 ("extra", op.extra), 

309 ("error", op.error), 

310 ("serialized", op.serialized), 

311 ): 

312 if value is None: 

313 continue 

314 valb = value 

315 acc_parts.append( 

316 ( 

317 f"{op.operation}.{op.id}.{key}", 

318 ( 

319 None, 

320 valb, 

321 "application/json", 

322 {"Content-Length": str(len(valb))}, 

323 ), 

324 ), 

325 ) 

326 if op.attachments: 

327 for n, (content_type, data_or_path) in op.attachments.items(): 

328 if "." in n: 

329 logger.warning( 

330 f"Skipping logging of attachment '{n}' " 

331 f"for run {op.id}:" 

332 " Invalid attachment name. Attachment names must not contain" 

333 " periods ('.'). Please rename the attachment and try again." 

334 ) 

335 continue 

336 

337 if isinstance(data_or_path, bytes): 

338 acc_parts.append( 

339 ( 

340 f"attachment.{op.id}.{n}", 

341 ( 

342 None, 

343 data_or_path, 

344 content_type, 

345 {"Content-Length": str(len(data_or_path))}, 

346 ), 

347 ) 

348 ) 

349 else: 

350 try: 

351 file_size = os.path.getsize(data_or_path) 

352 file = open(data_or_path, "rb") 

353 except FileNotFoundError: 

354 logger.warning( 

355 "Attachment file not found for run %s: %s", op.id, data_or_path 

356 ) 

357 continue 

358 opened_files_dict[str(data_or_path) + str(uuid.uuid4())] = file 

359 acc_parts.append( 

360 ( 

361 f"attachment.{op.id}.{n}", 

362 ( 

363 None, 

364 file, 

365 f"{content_type}; length={file_size}", 

366 {}, 

367 ), 

368 ) 

369 ) 

370 return ( 

371 MultipartPartsAndContext(acc_parts, f"trace={op.trace_id},id={op.id}"), 

372 opened_files_dict, 

373 ) 

374 

375 

376def encode_multipart_parts_and_context( 

377 parts_and_context: MultipartPartsAndContext, 

378 boundary: str, 

379) -> Iterable[tuple[bytes, Union[bytes, BufferedReader]]]: 

380 for part_name, (filename, data, content_type, headers) in parts_and_context.parts: 

381 header_parts = [ 

382 f"--{boundary}\r\n", 

383 f'Content-Disposition: form-data; name="{part_name}"', 

384 ] 

385 

386 if filename: 

387 header_parts.append(f'; filename="{filename}"') 

388 

389 header_parts.extend( 

390 [ 

391 f"\r\nContent-Type: {content_type}\r\n", 

392 *[f"{k}: {v}\r\n" for k, v in headers.items()], 

393 "\r\n", 

394 ] 

395 ) 

396 

397 yield ("".join(header_parts).encode(), data) 

398 

399 

400def compress_multipart_parts_and_context( 

401 parts_and_context: MultipartPartsAndContext, 

402 compressed_traces: CompressedTraces, 

403 boundary: str, 

404) -> bool: 

405 """Compress multipart parts into the shared compressed buffer. 

406 

407 Returns True if the parts were enqueued into the compressed buffer, or False 

408 if they were rejected because the configured in-memory size limit would be 

409 exceeded. 

410 """ 

411 write = compressed_traces.compressor_writer.write 

412 

413 parts: list[tuple[bytes, bytes]] = [] 

414 op_uncompressed_size = 0 

415 

416 for headers, data in encode_multipart_parts_and_context( 

417 parts_and_context, boundary 

418 ): 

419 # Normalise to bytes 

420 if not isinstance(data, (bytes, bytearray)): 

421 data = ( 

422 data.read() if isinstance(data, BufferedReader) else str(data).encode() 

423 ) 

424 

425 parts.append((headers, data)) 

426 op_uncompressed_size += len(data) 

427 

428 max_bytes = getattr(compressed_traces, "max_uncompressed_size_bytes", None) 

429 if max_bytes is not None and max_bytes > 0: 

430 current_size = compressed_traces.uncompressed_size 

431 if current_size > 0 and current_size + op_uncompressed_size > max_bytes: 

432 logger.warning( 

433 "Compressed traces queue size limit (%s bytes) exceeded. " 

434 "Dropping trace data with context: %s. " 

435 "Current queue size: %s bytes, attempted addition: %s bytes.", 

436 max_bytes, 

437 parts_and_context.context, 

438 current_size, 

439 op_uncompressed_size, 

440 ) 

441 return False 

442 

443 for headers, data in parts: 

444 write(headers) 

445 compressed_traces.uncompressed_size += len(data) 

446 write(data) 

447 write(b"\r\n") # part terminator 

448 

449 compressed_traces._context.append(parts_and_context.context) 

450 return True