Coverage for langsmith/anonymizer.py: 42%

100 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1import re # noqa 

2import inspect 

3from abc import abstractmethod 

4from collections import defaultdict 

5from typing import Any, Callable, Optional, TypedDict, Union 

6 

7 

8class _ExtractOptions(TypedDict): 

9 max_depth: Optional[int] 

10 """ 

11 Maximum depth to traverse to to extract string nodes 

12 """ 

13 

14 

15class StringNode(TypedDict): 

16 """String node extracted from the data.""" 

17 

18 value: str 

19 """String value.""" 

20 

21 path: list[Union[str, int]] 

22 """Path to the string node in the data.""" 

23 

24 

25def _extract_string_nodes(data: Any, options: _ExtractOptions) -> list[StringNode]: 

26 max_depth = options.get("max_depth") or 10 

27 

28 queue: list[tuple[Any, int, list[Union[str, int]]]] = [(data, 0, [])] 

29 result: list[StringNode] = [] 

30 

31 while queue: 

32 task = queue.pop(0) 

33 if task is None: 

34 continue 

35 value, depth, path = task 

36 

37 if isinstance(value, (dict, defaultdict)): 

38 if depth >= max_depth: 

39 continue 

40 for key, nested_value in value.items(): 

41 queue.append((nested_value, depth + 1, path + [key])) 

42 elif isinstance(value, list): 

43 if depth >= max_depth: 

44 continue 

45 for i, item in enumerate(value): 

46 queue.append((item, depth + 1, path + [i])) 

47 elif isinstance(value, str): 

48 result.append(StringNode(value=value, path=path)) 

49 

50 return result 

51 

52 

53class StringNodeProcessor: 

54 """Processes a list of string nodes for masking.""" 

55 

56 @abstractmethod 

57 def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]: 

58 """Accept and return a list of string nodes to be masked.""" 

59 

60 

61class ReplacerOptions(TypedDict): 

62 """Configuration options for replacing sensitive data.""" 

63 

64 max_depth: Optional[int] 

65 """Maximum depth to traverse to to extract string nodes.""" 

66 

67 deep_clone: Optional[bool] 

68 """Deep clone the data before replacing.""" 

69 

70 

71class StringNodeRule(TypedDict): 

72 """Declarative rule used for replacing sensitive data.""" 

73 

74 pattern: re.Pattern 

75 """Regex pattern to match.""" 

76 

77 replace: Optional[str] 

78 """Replacement value. Defaults to `[redacted]` if not specified.""" 

79 

80 

81class RuleNodeProcessor(StringNodeProcessor): 

82 """String node processor that uses a list of rules to replace sensitive data.""" 

83 

84 rules: list[StringNodeRule] 

85 """List of rules to apply for replacing sensitive data. 

86 

87 Each rule is a StringNodeRule, which contains a regex pattern to match 

88 and an optional replacement string. 

89 """ 

90 

91 def __init__(self, rules: list[StringNodeRule]): 

92 """Initialize the processor with a list of rules.""" 

93 self.rules = [ 

94 { 

95 "pattern": ( 

96 rule["pattern"] 

97 if isinstance(rule["pattern"], re.Pattern) 

98 else re.compile(rule["pattern"]) 

99 ), 

100 "replace": ( 

101 rule["replace"] 

102 if isinstance(rule.get("replace"), str) 

103 else "[redacted]" 

104 ), 

105 } 

106 for rule in rules 

107 ] 

108 

109 def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]: 

110 """Mask nodes using the rules.""" 

111 result = [] 

112 for item in nodes: 

113 new_value = item["value"] 

114 for rule in self.rules: 

115 new_value = rule["pattern"].sub(rule["replace"], new_value) 

116 if new_value != item["value"]: 

117 result.append(StringNode(value=new_value, path=item["path"])) 

118 return result 

119 

120 

121class CallableNodeProcessor(StringNodeProcessor): 

122 """String node processor that uses a callable function to replace sensitive data.""" 

123 

124 func: Union[Callable[[str], str], Callable[[str, list[Union[str, int]]], str]] 

125 """The callable function used to replace sensitive data. 

126  

127 It can be either a function that takes a single string argument and returns a string, 

128 or a function that takes a string and a list of path elements (strings or integers)  

129 and returns a string.""" 

130 

131 accepts_path: bool 

132 """Indicates whether the callable function accepts a path argument. 

133  

134 If True, the function expects two arguments: the string to be processed and the path to that string. 

135 If False, the function expects only the string to be processed.""" 

136 

137 def __init__( 

138 self, 

139 func: Union[Callable[[str], str], Callable[[str, list[Union[str, int]]], str]], 

140 ): 

141 """Initialize the processor with a callable function.""" 

142 self.func = func 

143 self.accepts_path = len(inspect.signature(func).parameters) == 2 

144 

145 def mask_nodes(self, nodes: list[StringNode]) -> list[StringNode]: 

146 """Mask nodes using the callable function.""" 

147 retval: list[StringNode] = [] 

148 for node in nodes: 

149 candidate = ( 

150 self.func(node["value"], node["path"]) # type: ignore[call-arg] 

151 if self.accepts_path 

152 else self.func(node["value"]) # type: ignore[call-arg] 

153 ) 

154 if candidate != node["value"]: 

155 retval.append(StringNode(value=candidate, path=node["path"])) 

156 return retval 

157 

158 

159ReplacerType = Union[ 

160 Callable[[str, list[Union[str, int]]], str], 

161 list[StringNodeRule], 

162 StringNodeProcessor, 

163] 

164 

165 

166def _get_node_processor(replacer: ReplacerType) -> StringNodeProcessor: 

167 if isinstance(replacer, list): 

168 return RuleNodeProcessor(rules=replacer) 

169 elif callable(replacer): 

170 return CallableNodeProcessor(func=replacer) 

171 else: 

172 return replacer 

173 

174 

175def create_anonymizer( 

176 replacer: ReplacerType, 

177 *, 

178 max_depth: Optional[int] = None, 

179) -> Callable[[Any], Any]: 

180 """Create an anonymizer function.""" 

181 processor = _get_node_processor(replacer) 

182 

183 def anonymizer(data: Any) -> Any: 

184 nodes = _extract_string_nodes(data, {"max_depth": max_depth or 10}) 

185 mutate_value = data 

186 

187 to_update = processor.mask_nodes(nodes) 

188 for node in to_update: 

189 if not node["path"]: 

190 mutate_value = node["value"] 

191 else: 

192 temp = mutate_value 

193 for part in node["path"][:-1]: 

194 temp = temp[part] 

195 

196 last_part = node["path"][-1] 

197 temp[last_part] = node["value"] 

198 

199 return mutate_value 

200 

201 return anonymizer