Coverage for langsmith/_internal/_aiter.py: 0%

146 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-12-11 16:15 -0800

1"""Adapted. 

2 

3Original source: 

4https://github.com/maxfischer2781/asyncstdlib/blob/master/asyncstdlib/itertools.py 

5MIT License 

6""" 

7 

8import asyncio 

9import contextvars 

10import functools 

11import inspect 

12from collections import deque 

13from collections.abc import ( 

14 AsyncGenerator, 

15 AsyncIterable, 

16 AsyncIterator, 

17 Awaitable, 

18 Coroutine, 

19 Iterable, 

20 Iterator, 

21) 

22from contextlib import AbstractAsyncContextManager 

23from typing import ( 

24 Any, 

25 Callable, 

26 Generic, 

27 Optional, 

28 TypeVar, 

29 Union, 

30 cast, 

31 overload, 

32) 

33 

34T = TypeVar("T") 

35 

36_no_default = object() 

37 

38 

39# https://github.com/python/cpython/blob/main/Lib/test/test_asyncgen.py#L54 

40# before 3.10, the builtin anext() was not available 

41def py_anext( 

42 iterator: AsyncIterator[T], default: Union[T, Any] = _no_default 

43) -> Awaitable[Union[T, None, Any]]: 

44 """Pure-Python implementation of anext() for testing purposes. 

45 

46 Closely matches the builtin anext() C implementation. 

47 Can be used to compare the built-in implementation of the inner 

48 coroutines machinery to C-implementation of __anext__() and send() 

49 or throw() on the returned generator. 

50 """ 

51 try: 

52 __anext__ = cast( 

53 Callable[[AsyncIterator[T]], Awaitable[T]], type(iterator).__anext__ 

54 ) 

55 except AttributeError: 

56 raise TypeError(f"{iterator!r} is not an async iterator") 

57 

58 if default is _no_default: 

59 return __anext__(iterator) 

60 

61 async def anext_impl() -> Union[T, Any]: 

62 try: 

63 # The C code is way more low-level than this, as it implements 

64 # all methods of the iterator protocol. In this implementation 

65 # we're relying on higher-level coroutine concepts, but that's 

66 # exactly what we want -- crosstest pure-Python high-level 

67 # implementation and low-level C anext() iterators. 

68 return await __anext__(iterator) 

69 except StopAsyncIteration: 

70 return default 

71 

72 return anext_impl() 

73 

74 

75class NoLock: 

76 """Dummy lock that provides the proper interface but no protection.""" 

77 

78 async def __aenter__(self) -> None: 

79 pass 

80 

81 async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: 

82 return False 

83 

84 

85async def tee_peer( 

86 iterator: AsyncIterator[T], 

87 # the buffer specific to this peer 

88 buffer: deque[T], 

89 # the buffers of all peers, including our own 

90 peers: list[deque[T]], 

91 lock: AbstractAsyncContextManager[Any], 

92) -> AsyncGenerator[T, None]: 

93 """Iterate over :py:func:`~.tee`.""" 

94 try: 

95 while True: 

96 if not buffer: 

97 async with lock: 

98 # Another peer produced an item while we were waiting for the lock. 

99 # Proceed with the next loop iteration to yield the item. 

100 if buffer: 

101 continue 

102 try: 

103 item = await iterator.__anext__() 

104 except StopAsyncIteration: 

105 break 

106 else: 

107 # Append to all buffers, including our own. We'll fetch our 

108 # item from the buffer again, instead of yielding it directly. 

109 # This ensures the proper item ordering if any of our peers 

110 # are fetching items concurrently. They may have buffered their 

111 # item already. 

112 for peer_buffer in peers: 

113 peer_buffer.append(item) 

114 yield buffer.popleft() 

115 finally: 

116 async with lock: 

117 # this peer is done – remove its buffer 

118 for idx, peer_buffer in enumerate(peers): # pragma: no branch 

119 if peer_buffer is buffer: 

120 peers.pop(idx) 

121 break 

122 # if we are the last peer, try and close the iterator 

123 if not peers and hasattr(iterator, "aclose"): 

124 await iterator.aclose() 

125 

126 

127class Tee(Generic[T]): 

128 """Create ``n`` separate asynchronous iterators over ``iterable``. 

129 

130 This splits a single ``iterable`` into multiple iterators, each providing 

131 the same items in the same order. 

132 All child iterators may advance separately but pare the same items 

133 from ``iterable`` -- when the most advanced iterator retrieves an item, 

134 it is buffered until the least advanced iterator has yielded it as well. 

135 A ``tee`` works lazily and can handle an infinite ``iterable``, provided 

136 that all iterators advance. 

137 

138 ```python 

139 async def derivative(sensor_data): 

140 previous, current = a.tee(sensor_data, n=2) 

141 await a.anext(previous) # advance one iterator 

142 return a.map(operator.sub, previous, current) 

143 ``` 

144 

145 Unlike :py:func:`itertools.tee`, :py:func:`~.tee` returns a custom type instead 

146 of a :py:class:`tuple`. Like a tuple, it can be indexed, iterated and unpacked 

147 to get the child iterators. In addition, its :py:meth:`~.tee.aclose` method 

148 immediately closes all children, and it can be used in an ``async with`` context 

149 for the same effect. 

150 

151 If ``iterable`` is an iterator and read elsewhere, ``tee`` will *not* 

152 provide these items. Also, ``tee`` must internally buffer each item until the 

153 last iterator has yielded it; if the most and least advanced iterator differ 

154 by most data, using a :py:class:`list` is more efficient (but not lazy). 

155 

156 If the underlying iterable is concurrency safe (``anext`` may be awaited 

157 concurrently) the resulting iterators are concurrency safe as well. Otherwise, 

158 the iterators are safe if there is only ever one single "most advanced" iterator. 

159 To enforce sequential use of ``anext``, provide a ``lock`` 

160 - e.g. an :py:class:`asyncio.Lock` instance in an :py:mod:`asyncio` application - 

161 and access is automatically synchronised. 

162 """ 

163 

164 def __init__( 

165 self, 

166 iterable: AsyncIterator[T], 

167 n: int = 2, 

168 *, 

169 lock: Optional[AbstractAsyncContextManager[Any]] = None, 

170 ): 

171 self._iterator = iterable.__aiter__() # before 3.10 aiter() doesn't exist 

172 self._buffers: list[deque[T]] = [deque() for _ in range(n)] 

173 self._children = tuple( 

174 tee_peer( 

175 iterator=self._iterator, 

176 buffer=buffer, 

177 peers=self._buffers, 

178 lock=lock if lock is not None else NoLock(), 

179 ) 

180 for buffer in self._buffers 

181 ) 

182 

183 def __len__(self) -> int: 

184 return len(self._children) 

185 

186 @overload 

187 def __getitem__(self, item: int) -> AsyncIterator[T]: ... 

188 

189 @overload 

190 def __getitem__(self, item: slice) -> tuple[AsyncIterator[T], ...]: ... 

191 

192 def __getitem__( 

193 self, item: Union[int, slice] 

194 ) -> Union[AsyncIterator[T], tuple[AsyncIterator[T], ...]]: 

195 return self._children[item] 

196 

197 def __iter__(self) -> Iterator[AsyncIterator[T]]: 

198 yield from self._children 

199 

200 async def __aenter__(self) -> "Tee[T]": 

201 return self 

202 

203 async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: 

204 await self.aclose() 

205 return False 

206 

207 async def aclose(self) -> None: 

208 for child in self._children: 

209 await child.aclose() 

210 

211 

212atee = Tee 

213 

214 

215async def async_zip(*async_iterables): 

216 """Async version of zip.""" 

217 # Before Python 3.10, aiter() was not available 

218 iterators = [iterable.__aiter__() for iterable in async_iterables] 

219 while True: 

220 try: 

221 items = await asyncio.gather( 

222 *(py_anext(iterator) for iterator in iterators) 

223 ) 

224 yield tuple(items) 

225 except StopAsyncIteration: 

226 break 

227 

228 

229def ensure_async_iterator( 

230 iterable: Union[Iterable, AsyncIterable], 

231) -> AsyncIterator: 

232 if hasattr(iterable, "__anext__"): 

233 return cast(AsyncIterator, iterable) 

234 elif hasattr(iterable, "__aiter__"): 

235 return cast(AsyncIterator, iterable.__aiter__()) 

236 else: 

237 

238 class AsyncIteratorWrapper: 

239 def __init__(self, iterable: Iterable): 

240 self._iterator = iter(iterable) 

241 

242 async def __anext__(self): 

243 try: 

244 return next(self._iterator) 

245 except StopIteration: 

246 raise StopAsyncIteration 

247 

248 def __aiter__(self): 

249 return self 

250 

251 return AsyncIteratorWrapper(iterable) 

252 

253 

254def aiter_with_concurrency( 

255 n: Optional[int], 

256 generator: AsyncIterator[Coroutine[None, None, T]], 

257 *, 

258 _eager_consumption_timeout: float = 0, 

259) -> AsyncGenerator[T, None]: 

260 """Process async generator with max parallelism. 

261 

262 Args: 

263 n: The number of tasks to run concurrently. 

264 generator: The async generator to process. 

265 _eager_consumption_timeout: If set, check for completed tasks after 

266 each iteration and yield their results. This can be used to 

267 consume the generator eagerly while still respecting the concurrency 

268 limit. 

269 

270 Yields: 

271 The processed items yielded by the async generator. 

272 """ 

273 if n == 0: 

274 

275 async def consume(): 

276 async for item in generator: 

277 yield await item 

278 

279 return consume() 

280 semaphore = cast( 

281 asyncio.Semaphore, asyncio.Semaphore(n) if n is not None else NoLock() 

282 ) 

283 

284 async def process_item(ix: int, item): 

285 async with semaphore: 

286 res = await item 

287 return (ix, res) 

288 

289 async def process_generator(): 

290 tasks = {} 

291 accepts_context = asyncio_accepts_context() 

292 ix = 0 

293 async for item in generator: 

294 if accepts_context: 

295 context = contextvars.copy_context() 

296 task = asyncio.create_task(process_item(ix, item), context=context) 

297 else: 

298 task = asyncio.create_task(process_item(ix, item)) 

299 tasks[ix] = task 

300 ix += 1 

301 if _eager_consumption_timeout > 0: 

302 try: 

303 for _fut in asyncio.as_completed( 

304 tasks.values(), 

305 timeout=_eager_consumption_timeout, 

306 ): 

307 task_idx, res = await _fut 

308 yield res 

309 del tasks[task_idx] 

310 except asyncio.TimeoutError: 

311 pass 

312 if n is not None and len(tasks) >= n: 

313 done, _ = await asyncio.wait( 

314 tasks.values(), return_when=asyncio.FIRST_COMPLETED 

315 ) 

316 for task in done: 

317 task_idx, res = task.result() 

318 yield res 

319 del tasks[task_idx] 

320 

321 for task in asyncio.as_completed(tasks.values()): 

322 _, res = await task 

323 yield res 

324 

325 return process_generator() 

326 

327 

328def accepts_context(callable: Callable[..., Any]) -> bool: 

329 """Check if a callable accepts a context argument.""" 

330 try: 

331 return inspect.signature(callable).parameters.get("context") is not None 

332 except ValueError: 

333 return False 

334 

335 

336# Ported from Python 3.9+ to support Python 3.8 

337async def aio_to_thread( 

338 func, /, *args, __ctx: Optional[contextvars.Context] = None, **kwargs 

339): 

340 """Asynchronously run function *func* in a separate thread. 

341 

342 Any *args and **kwargs supplied for this function are directly passed 

343 to *func*. Also, the current :class:`contextvars.Context` is propagated, 

344 allowing context variables from the main thread to be accessed in the 

345 separate thread. 

346 

347 Return a coroutine that can be awaited to get the eventual result of *func*. 

348 """ 

349 loop = asyncio.get_running_loop() 

350 ctx = __ctx or contextvars.copy_context() 

351 func_call = functools.partial(ctx.run, func, *args, **kwargs) 

352 return await loop.run_in_executor(None, func_call) 

353 

354 

355@functools.lru_cache(maxsize=1) 

356def asyncio_accepts_context(): 

357 """Check if the current asyncio event loop accepts a context argument.""" 

358 return accepts_context(asyncio.create_task)