Coverage for langsmith/_internal/_aiter.py: 0%
146 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
« prev ^ index » next coverage.py v7.10.1, created at 2025-12-11 16:15 -0800
1"""Adapted.
3Original source:
4https://github.com/maxfischer2781/asyncstdlib/blob/master/asyncstdlib/itertools.py
5MIT License
6"""
8import asyncio
9import contextvars
10import functools
11import inspect
12from collections import deque
13from collections.abc import (
14 AsyncGenerator,
15 AsyncIterable,
16 AsyncIterator,
17 Awaitable,
18 Coroutine,
19 Iterable,
20 Iterator,
21)
22from contextlib import AbstractAsyncContextManager
23from typing import (
24 Any,
25 Callable,
26 Generic,
27 Optional,
28 TypeVar,
29 Union,
30 cast,
31 overload,
32)
34T = TypeVar("T")
36_no_default = object()
39# https://github.com/python/cpython/blob/main/Lib/test/test_asyncgen.py#L54
40# before 3.10, the builtin anext() was not available
41def py_anext(
42 iterator: AsyncIterator[T], default: Union[T, Any] = _no_default
43) -> Awaitable[Union[T, None, Any]]:
44 """Pure-Python implementation of anext() for testing purposes.
46 Closely matches the builtin anext() C implementation.
47 Can be used to compare the built-in implementation of the inner
48 coroutines machinery to C-implementation of __anext__() and send()
49 or throw() on the returned generator.
50 """
51 try:
52 __anext__ = cast(
53 Callable[[AsyncIterator[T]], Awaitable[T]], type(iterator).__anext__
54 )
55 except AttributeError:
56 raise TypeError(f"{iterator!r} is not an async iterator")
58 if default is _no_default:
59 return __anext__(iterator)
61 async def anext_impl() -> Union[T, Any]:
62 try:
63 # The C code is way more low-level than this, as it implements
64 # all methods of the iterator protocol. In this implementation
65 # we're relying on higher-level coroutine concepts, but that's
66 # exactly what we want -- crosstest pure-Python high-level
67 # implementation and low-level C anext() iterators.
68 return await __anext__(iterator)
69 except StopAsyncIteration:
70 return default
72 return anext_impl()
75class NoLock:
76 """Dummy lock that provides the proper interface but no protection."""
78 async def __aenter__(self) -> None:
79 pass
81 async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
82 return False
85async def tee_peer(
86 iterator: AsyncIterator[T],
87 # the buffer specific to this peer
88 buffer: deque[T],
89 # the buffers of all peers, including our own
90 peers: list[deque[T]],
91 lock: AbstractAsyncContextManager[Any],
92) -> AsyncGenerator[T, None]:
93 """Iterate over :py:func:`~.tee`."""
94 try:
95 while True:
96 if not buffer:
97 async with lock:
98 # Another peer produced an item while we were waiting for the lock.
99 # Proceed with the next loop iteration to yield the item.
100 if buffer:
101 continue
102 try:
103 item = await iterator.__anext__()
104 except StopAsyncIteration:
105 break
106 else:
107 # Append to all buffers, including our own. We'll fetch our
108 # item from the buffer again, instead of yielding it directly.
109 # This ensures the proper item ordering if any of our peers
110 # are fetching items concurrently. They may have buffered their
111 # item already.
112 for peer_buffer in peers:
113 peer_buffer.append(item)
114 yield buffer.popleft()
115 finally:
116 async with lock:
117 # this peer is done – remove its buffer
118 for idx, peer_buffer in enumerate(peers): # pragma: no branch
119 if peer_buffer is buffer:
120 peers.pop(idx)
121 break
122 # if we are the last peer, try and close the iterator
123 if not peers and hasattr(iterator, "aclose"):
124 await iterator.aclose()
127class Tee(Generic[T]):
128 """Create ``n`` separate asynchronous iterators over ``iterable``.
130 This splits a single ``iterable`` into multiple iterators, each providing
131 the same items in the same order.
132 All child iterators may advance separately but pare the same items
133 from ``iterable`` -- when the most advanced iterator retrieves an item,
134 it is buffered until the least advanced iterator has yielded it as well.
135 A ``tee`` works lazily and can handle an infinite ``iterable``, provided
136 that all iterators advance.
138 ```python
139 async def derivative(sensor_data):
140 previous, current = a.tee(sensor_data, n=2)
141 await a.anext(previous) # advance one iterator
142 return a.map(operator.sub, previous, current)
143 ```
145 Unlike :py:func:`itertools.tee`, :py:func:`~.tee` returns a custom type instead
146 of a :py:class:`tuple`. Like a tuple, it can be indexed, iterated and unpacked
147 to get the child iterators. In addition, its :py:meth:`~.tee.aclose` method
148 immediately closes all children, and it can be used in an ``async with`` context
149 for the same effect.
151 If ``iterable`` is an iterator and read elsewhere, ``tee`` will *not*
152 provide these items. Also, ``tee`` must internally buffer each item until the
153 last iterator has yielded it; if the most and least advanced iterator differ
154 by most data, using a :py:class:`list` is more efficient (but not lazy).
156 If the underlying iterable is concurrency safe (``anext`` may be awaited
157 concurrently) the resulting iterators are concurrency safe as well. Otherwise,
158 the iterators are safe if there is only ever one single "most advanced" iterator.
159 To enforce sequential use of ``anext``, provide a ``lock``
160 - e.g. an :py:class:`asyncio.Lock` instance in an :py:mod:`asyncio` application -
161 and access is automatically synchronised.
162 """
164 def __init__(
165 self,
166 iterable: AsyncIterator[T],
167 n: int = 2,
168 *,
169 lock: Optional[AbstractAsyncContextManager[Any]] = None,
170 ):
171 self._iterator = iterable.__aiter__() # before 3.10 aiter() doesn't exist
172 self._buffers: list[deque[T]] = [deque() for _ in range(n)]
173 self._children = tuple(
174 tee_peer(
175 iterator=self._iterator,
176 buffer=buffer,
177 peers=self._buffers,
178 lock=lock if lock is not None else NoLock(),
179 )
180 for buffer in self._buffers
181 )
183 def __len__(self) -> int:
184 return len(self._children)
186 @overload
187 def __getitem__(self, item: int) -> AsyncIterator[T]: ...
189 @overload
190 def __getitem__(self, item: slice) -> tuple[AsyncIterator[T], ...]: ...
192 def __getitem__(
193 self, item: Union[int, slice]
194 ) -> Union[AsyncIterator[T], tuple[AsyncIterator[T], ...]]:
195 return self._children[item]
197 def __iter__(self) -> Iterator[AsyncIterator[T]]:
198 yield from self._children
200 async def __aenter__(self) -> "Tee[T]":
201 return self
203 async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
204 await self.aclose()
205 return False
207 async def aclose(self) -> None:
208 for child in self._children:
209 await child.aclose()
212atee = Tee
215async def async_zip(*async_iterables):
216 """Async version of zip."""
217 # Before Python 3.10, aiter() was not available
218 iterators = [iterable.__aiter__() for iterable in async_iterables]
219 while True:
220 try:
221 items = await asyncio.gather(
222 *(py_anext(iterator) for iterator in iterators)
223 )
224 yield tuple(items)
225 except StopAsyncIteration:
226 break
229def ensure_async_iterator(
230 iterable: Union[Iterable, AsyncIterable],
231) -> AsyncIterator:
232 if hasattr(iterable, "__anext__"):
233 return cast(AsyncIterator, iterable)
234 elif hasattr(iterable, "__aiter__"):
235 return cast(AsyncIterator, iterable.__aiter__())
236 else:
238 class AsyncIteratorWrapper:
239 def __init__(self, iterable: Iterable):
240 self._iterator = iter(iterable)
242 async def __anext__(self):
243 try:
244 return next(self._iterator)
245 except StopIteration:
246 raise StopAsyncIteration
248 def __aiter__(self):
249 return self
251 return AsyncIteratorWrapper(iterable)
254def aiter_with_concurrency(
255 n: Optional[int],
256 generator: AsyncIterator[Coroutine[None, None, T]],
257 *,
258 _eager_consumption_timeout: float = 0,
259) -> AsyncGenerator[T, None]:
260 """Process async generator with max parallelism.
262 Args:
263 n: The number of tasks to run concurrently.
264 generator: The async generator to process.
265 _eager_consumption_timeout: If set, check for completed tasks after
266 each iteration and yield their results. This can be used to
267 consume the generator eagerly while still respecting the concurrency
268 limit.
270 Yields:
271 The processed items yielded by the async generator.
272 """
273 if n == 0:
275 async def consume():
276 async for item in generator:
277 yield await item
279 return consume()
280 semaphore = cast(
281 asyncio.Semaphore, asyncio.Semaphore(n) if n is not None else NoLock()
282 )
284 async def process_item(ix: int, item):
285 async with semaphore:
286 res = await item
287 return (ix, res)
289 async def process_generator():
290 tasks = {}
291 accepts_context = asyncio_accepts_context()
292 ix = 0
293 async for item in generator:
294 if accepts_context:
295 context = contextvars.copy_context()
296 task = asyncio.create_task(process_item(ix, item), context=context)
297 else:
298 task = asyncio.create_task(process_item(ix, item))
299 tasks[ix] = task
300 ix += 1
301 if _eager_consumption_timeout > 0:
302 try:
303 for _fut in asyncio.as_completed(
304 tasks.values(),
305 timeout=_eager_consumption_timeout,
306 ):
307 task_idx, res = await _fut
308 yield res
309 del tasks[task_idx]
310 except asyncio.TimeoutError:
311 pass
312 if n is not None and len(tasks) >= n:
313 done, _ = await asyncio.wait(
314 tasks.values(), return_when=asyncio.FIRST_COMPLETED
315 )
316 for task in done:
317 task_idx, res = task.result()
318 yield res
319 del tasks[task_idx]
321 for task in asyncio.as_completed(tasks.values()):
322 _, res = await task
323 yield res
325 return process_generator()
328def accepts_context(callable: Callable[..., Any]) -> bool:
329 """Check if a callable accepts a context argument."""
330 try:
331 return inspect.signature(callable).parameters.get("context") is not None
332 except ValueError:
333 return False
336# Ported from Python 3.9+ to support Python 3.8
337async def aio_to_thread(
338 func, /, *args, __ctx: Optional[contextvars.Context] = None, **kwargs
339):
340 """Asynchronously run function *func* in a separate thread.
342 Any *args and **kwargs supplied for this function are directly passed
343 to *func*. Also, the current :class:`contextvars.Context` is propagated,
344 allowing context variables from the main thread to be accessed in the
345 separate thread.
347 Return a coroutine that can be awaited to get the eventual result of *func*.
348 """
349 loop = asyncio.get_running_loop()
350 ctx = __ctx or contextvars.copy_context()
351 func_call = functools.partial(ctx.run, func, *args, **kwargs)
352 return await loop.run_in_executor(None, func_call)
355@functools.lru_cache(maxsize=1)
356def asyncio_accepts_context():
357 """Check if the current asyncio event loop accepts a context argument."""
358 return accepts_context(asyncio.create_task)