How can I limit the number of requests made per second?
In this comment @florimondmanca suggests that the connection pool limits can be used to rate-limit the number of requests made. I can see, of course, how setting those limits will constrain the number of 'concurrent' requests made by httpx. However, what if I want to limit the number of requests made per unit of time?
Say the API I'm hitting allows 10 requests per second, I guess a naive approach would be to set the hard limit of the connection pool to 10 and asyncio.sleep 1/10 s after every request. Does that sound about right or am I missing something obvious?
There's no built-in API for that, no.
You might want to take a https://github.com/python-trio/trimeter which has some really nice generic tools for capacity limiting.
Say the API I'm hitting allows 10 requests per second, I guess a naive approach would be to set the hard limit of the connection pool to 10 and asyncio.sleep 1/10 s after every request. Does that sound about right or am I missing something obvious?
Yeah, something like that could be perfectly reasonable (minus however much time has elapsed making the request)
They warn that the library is not production ready but this example here looks perfect for me.
async with trimeter.amap(fetch_and_process, urls, max_at_once=10, max_per_second=10) as results:
# Then iterate over the return values, as they become available
# (i.e., not necessarily in the original order)
async for result in results:
...
Failed to install trimeter in a conda environment but at least I'll know to watch the package. Thanks!
I just wanted to comment that I'm running into this exact scenario:
r.status_code=429, r.text='You have exceeded the limit of 15 requests per second; please reduce your concurrent connections'
It'd be really to have a max_per_second or similar parameter in the client. 😄
Have a look at https://github.com/florimondmanca/aiometer it has served me well in most cases.
Oh sweet, that looks more mature than trimeter. Thank you!
It would be really helpful to have this API within httpx, even if it used something like aiometer internally. Using aiometer externally completely changes the way you interact with httpx and makes it much harder to write a client library. Being able to set something on the client and, then forget about it and use your normal access pattern, would be a huge leap in ergonomics. @Midnighter could you reopen this issue?
I suppose this could be solved without changes to httpx required, by writing a custom transport that wraps the default one with calls to aiometer?
I suppose this could be solved without changes to httpx required, by writing a custom transport that wraps the default one with calls to aiometer?
Certainly possible!
Not very experienced with asyncio but here's my stab (having gone through just about every synchronisation primitive it has to offer...).
Rate limits are generally defined as N requests per I interval, so this subclass of AsyncClient wraps a semaphore with N slots available. When send is scheduled, it has to wait until the semaphore becomes available, and then eagerly schedules two tasks: one to release the semaphore after I has expired, and one to send the request. This ensures that the response can be returned as soon as it's available (so we don't have to sleep after every request if we're not going to hit the limit), and that we don't hit the limit. I think that creating a task out of the super().send coroutine means that there shouldn't be any waiting before it gets sent off (which would cause problems); or at last that creating the waiter task second should mean that it shows up later in the queue.
import asyncio
import datetime as dt
from functools import wraps
from typing import Union
from httpx import AsyncClient
# unless you keep a strong reference to a running task, it can be dropped during execution
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
_background_tasks = set()
class RateLimitedClient(AsyncClient):
"""httpx.AsyncClient with a rate limit."""
def __init__(self, interval: Union[dt.timedelta, float], count=1, **kwargs):
"""
Parameters
----------
interval : Union[dt.timedelta, float]
Length of interval.
If a float is given, seconds are assumed.
numerator : int, optional
Number of requests which can be sent in any given interval (default 1).
"""
if isinstance(interval, dt.timedelta):
interval = interval.total_seconds()
self.interval = interval
self.semaphore = asyncio.Semaphore(count)
super().__init__(**kwargs)
def _schedule_semaphore_release(self):
wait = asyncio.create_task(asyncio.sleep(self.interval))
_background_tasks.add(wait)
def wait_cb(task):
self.semaphore.release()
_background_tasks.discard(task)
wait.add_done_callback(wait_cb)
@wraps(AsyncClient.send)
async def send(self, *args, **kwargs):
await self.semaphore.acquire()
send = asyncio.create_task(super().send(*args, **kwargs))
self._schedule_semaphore_release()
return await send
Since this topic keeps being popular, I'll mention another possible solution that might integrate better with already existing httpx usage flows: using a rate limiter context manager, say from limiter or aiolimiter, within an httpx.Auth subclass.
Something like (Untested code):
import aiolimiter
import httpx
class HttpxRateLimiter(httpx.Auth):
def __init__(self):
self.limiter = aiolimter.AsyncLimiter(600) # <- 10 per second
async def async_auth_flow(self, request):
async with self.limiter:
yield request
Although HTTPX premises on speed, it is important to have a simple way to define the rate limit of requests/time. In my experience using HTTPX I faced rate limit errors and what I have done was to adjust the httpx.Limits(max_keepalive_connections, max_connections) and passing it do httpx.AyncClient and make some tentatives until the rate limit error no more occurs.
Since this topic keeps being popular, I'll mention another possible solution that might integrate better with already existing httpx usage flows: using a rate limiter context manager, say from limiter or aiolimiter, within an httpx.Auth subclass.
Something like (Untested code):
import aiolimiter import httpx class HttpxRateLimiter(httpx.Auth): def __init__(self): self.limiter = aiolimter.AsyncLimiter(600) # <- 10 per second async def async_auth_flow(self, request): async with self.limiter: yield request
I am not familiar with the inner architecture of HTTPX, so, beforehand, apologize for my question below.
Why not implement HttpxRateLimiter (and I suggest the name HttpxRateLimiterClient) inherit from httpx.AsyncClient? From the perspective of the user, it will be simpler.
I just had another brief look, and if the limiter works as advertised, it provides a really simple interface to implement rate limiting directly on a transport. I didn't understand, to be honest, why one would implement it on top of the httpx.Auth only.
I'm not sure if inheritance or composition are better in this scenario, but I'm going with composition as shown in the httpx docs. Below code is untested, consider it pseudo-code.
import httpx
from aiolimiter import AsyncLimiter
class AsyncRateLimitedTransport(httpx.BaseTransport):
def __init__(self, *, max_rate: float, time_period: float = 60.0, **kwargs):
self._transport = httpx.ASGITransport(**kwargs)
self._limiter = AsyncLimiter(max_rate=max_rate, time_period=time_period)
async def handle_request(self, request):
async with self._limiter:
return await self._transport.handle_request(request)
async def close(self):
await self._transport.close()
client = httpx.AsyncClient(transport=AsyncRateLimitedTransport(max_rate=10, time_period=1.0)
Add some testing, documentation, a sync version, too, and that's a small but extremely useful package right there, I think.
Or do you see a place for such a class in httpx @tomchristie? Otherwise, I'll give making a package a shot later on.
I have just published the first version of https://pypi.org/project/httpx-limiter/. I would appreciate any testing that anyone here can do. Just create an issue if you find something 😌
Or do you see a place for such a class
@Midnighter Heya. Nicely done, thanks.
I'm not sure... it's obviously a really nice piece of behaviour. Perhaps a good starting point would be for us to document it as an example of a custom transport, since it composes so nicely? Having a sensible rate-limit being on-by-default might be pretty neat, tho I don't feel ready to take a call on that just yet.
Fair enough. It's such a tiny dependency that it shouldn't hurt to add to a project. Let's see what folks think and if it does become very popular, you can always re-evaluate.
Having it mentioned in the httpx main docs would be fantastic, of course!
Would you like me to make a PR with the change to the docs? I'm not clear on what you have in mind, though.
I wrote a sync version based on @Midnighter's code:
class RateLimitedTransport(httpx.BaseTransport):
def __init__(self, *, limiter: Limiter, transport: httpx.BaseTransport, **kwargs):
super().__init__(**kwargs)
self._limiter = limiter
self._transport = transport
@classmethod
def create(
cls, *, rate: Tokens, capacity: Tokens, consume: Tokens | None = None, **kwargs
) -> "RateLimitedTransport":
return cls(
limiter=Limiter(rate=rate, capacity=capacity, consume=consume),
transport=httpx.HTTPTransport(**kwargs),
)
def handle_request(self, request: httpx.Request) -> httpx.Response:
with self._limiter:
return self._transport.handle_request(request)
I found this tip in https://github.com/encode/httpx/discussions/2989.
I think it would be fantastic if httpx included both sync and async versions depending on limiter when installed as httpx[limiter] or something like that. At least it would be great to document it... Finding these gems is really much harder than it should be :(
I can submit a docs PR if @tomchristie thinks rate limiters cannot be accepted even with an optional dependency.
@zyv I had it on my mind to add a sync version, too, but I wanted to wait for a compelling use-case. What's your application today? Do you use a thread pool?
Please note that I'm testing alternative implementations of the leaky bucket algorithm. The current dependency only supports rates per second, but there is a need for per minute, hour, day as well.
@zyv I had it on my mind to add a sync version, too, but I wanted to wait for a compelling use-case. What's your application today? Do you use a thread pool?
I'm writing a fairly complex piece of software to perform Trac to GitHub migrations, and it makes extensive use of official and unofficial GitHub APIs.
Strictly speaking, there are no good reasons to not do everything async... However, the performance of the application is limited by the performance of the Trac server, the complexity of the pre- and post-processing, and the rate limits of the GitHub APIs. Namely, I can't realistically do more than about ~1 request per second to GitHub.
Under these circumstances, it didn't feel justified to "infect" everything with async and increase the complexity even further, because it won't improve the performance of the migration, and it should (ideally) only run once per project anyway. Also, having everything sync and linear is very nice for figuring out exactly what happened in which order and why.
Please note that I'm testing alternative implementations of the leaky bucket algorithm. The current dependency only supports rates per second, but there is a need for per minute, hour, day as well.
This is useful in general, but not necessarily for my use case :-)
Please note that I'm testing alternative implementations of the leaky bucket algorithm. The current dependency only supports rates per second, but there is a need for per minute, hour, day as well.
Could you have it work on a datetime.timedelta? Seems like the right type for the job.
I'm trying out different packages rather than creating my own implementations. So I have no control over the types used.
(Although it's crossed my mind that this might be a fun project to try my hand at Rust.)
I just released v0.2.0 of httpx-limiter that switches out the leaky bucket implementation to allow for more flexible rates, i.e., not only per second.
@clbarnes you will be delighted to hear that timedelta is now supported. Simple numbers are still supported and interpreted to be in unit seconds.
In the mean time, I have come across the GCRA algorithm, which is very poorly explained on its Wikipedia page, but claims to be a much more efficient algorithm for a leaky bucket-esque rate limiter. I've experimented a bit and the delivery doesn't seem to be as reliable as leaky bucket, but not having to store a refere for and spawn a task for every request is a big plus.
@clbarnes do keep me updated on your progress, please. So far, I'm not fully happy with any of the leaky bucket implementations that I've tried.
A note for everyone that I just dropped v0.3.0 which gives you more options for implementing multiple rate limits. Check out the new tutorial.
Started a proper documentation site for it https://midnighter.github.io/httpx-limiter/
@clbarnes do keep me updated on your progress, please. So far, I'm not fully happy with any of the leaky bucket implementations that I've tried.
A note for everyone that I just dropped v0.3.0 which gives you more options for implementing multiple rate limits. Check out the new tutorial.
What's wrong with pyrate-limiter? (disclosure, not maintainer but I recently contributed a MultiprocessBucket to it, as my use case needed distributed rate limiting across multiple processes). It's been around a while and is pretty flexible to different use cases.
This was a minimal transports I'm using:
from httpx import Response, Request, AsyncHTTPTransport, HTTPTransport
from pyrate_limiter import Limiter
class RateLimiterTransport(HTTPTransport):
def __init__(self, limiter: Limiter, **kwargs):
super().__init__(**kwargs)
self.limiter = limiter
def handle_request(self, request: Request, **kwargs) -> Response:
# using a constant string for item name means that the same
# rate is applied to all requests.
acquired = self.limiter.try_acquire("httpx_ratelimiter")
if not acquired:
raise RuntimeError("Did not acquire lock")
return super().handle_request(request, **kwargs)
class AsyncAsyncLimiterTransport(AsyncHTTPTransport):
def __init__(self, limiter: Limiter, **kwargs):
super().__init__(**kwargs)
self.limiter = limiter
async def handle_async_request(self, request: Request, **kwargs) -> Response:
acquired = await self.limiter.try_acquire_async("httpx_ratelimiter")
if not acquired:
raise RuntimeError("Did not acquire lock")
response = await super().handle_async_request(request, **kwargs)
return response
Usage:
from pyrate_limiter import *
import httpx
url="https://.../..."
limiter = Limiter(Rate(1, Duration.SECOND), max_delay=Duration.HOUR, raise_when_fail=True)
transport = RateLimiterTransport(limiter=limiter)
with httpx.Client(transport=transport) as client:
for _ in range(10):
response = client.get(url)
print(response.json())
Your example shows some of the issues that I have. There is a branch on my repo with pyrate limiter and you are welcome to suggest improvements.
My problems:
- Documentation for asynchronous use of pyrate limiter is very poor.
- I find the exposed API less convenient than other packages. I would never want to raise an exception as in your example, when I cannot acquire. I would always want to wait until capacity is available instead. And usually without a maximum delay, although that may sometimes be useful.
- Pyrate limiter does some funky things with event loops. As a consequence, I'm not able to cancel tasks.
Thank you, and I agree.... I was working on a downstream project, hit these issues, and submitted a few PRs for these exact issues, so glad to hear your issues are same as mine. It's not my project, I was just curious and wanted to share how I do it.
The PRs have since been merged, altho some are waiting for next release (3.8.2, I guess):
- updated documentation, altho I'm sure more could be done.
- adding working examples with good defaults, plus added limiter_factory with some helper functions.
- added retry_until_max_delay: this eliminates the weird case where acquisition returns False
- try_acquire_async is new; and I believe it'll address the asyncio issue you mentioned.
- I'm working on, but not quite ready, a rework of how delays are calculated including allowing for infinite wait (no maximum).
I fell into this rabbit hole bc I was working on Multiprocess rate limiting, so even if you don't use it, maybe some ideas you could use.
Thank you, that sounds great. I will give it another try when I'm back from vacation.
I would love to see a context API be implemented, I find it very elegant, and I saw it on the roadmap for v3.