Feature request: Async support for ApiGatewayResolver
Use case
I would love to be able to use asynchronous Python with Powertools. I understand there is not as much need for it in a Lambda runtime as a Lambda (process) will handle only one event but I would still be able to use async-only libraries like encode/databases (with the likes of Neon) or simply reduce my execution time by utilizing asyncio.gather when doing concurrent waits for I/O operations.
Solution/User Experience
I imagine being able to use it similarly to what the GraphQL resolver does:
@app.get("/todos")
@tracer.capture_method
async def get_todos():
todos: httpx.Response = httpx.get("https://jsonplaceholder.typicode.com/todos")
todos.raise_for_status()
return {"todos": todos.json()[:10]}
# You can continue to use other utilities just as before
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
result = app.resolve(event, context)
return asyncio.run(result)
Alternative solutions
No response
Acknowledgment
- [X] This feature request meets Powertools for AWS Lambda (Python) Tenets
- [ ] Should this be considered in other Powertools for AWS Lambda languages? i.e. Java, TypeScript, and .NET
Thanks for opening your first issue here! We'll come back to you as soon as we can. In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link
Hey @pkucmus! Wowww, we're diving into some pretty cool Feature Request here: making our EventHandler and all its resolvers work async and add more flexibility to enhance I/O operations. But it comes with some significant changes ahead. One big switch-up is how we handle object serialization. It basically means we'll need to rethink how we call Python functions and make all (or most) of them async. Another challenge is that our resolvers are a kind of Middleware between Lambda execution and the map between specific routes (GET /todos, POST /hello) to execute functions. That adds a bit of extra spice to the mix of how to execute these functions with coroutines. And when it comes to our OpenAPI validation, currently, it's all about sync calls. Shifting this to asynchronous operations is quite a significant work.
To be completely transparent, I'm unable to quantify the extent of work needed to implement this capability right now. Perhaps we should consider drafting an RFC to outline the process in detail. It might even be worth exploring the creation of entirely new resolvers that operate entirely async.
It's encouraging to see "thumbs up" from many people regarding this matter, and I believe we should seriously consider it. To ensure we stay on track, I'll add some labels such as "revisit in 3 months" and "need help" to keep this issue on our radar and facilitate brainstorming for a solution.
Thank you and please let us know if you have in mind any kind of implementation or solution for this.
Thank you @leandrodamascena, what a nice approach for a random idea like that. I would be happy to put in some work into this. Maybe this would not be as replacement for the existing resolvers but something additional? If you would like I could draft something but I would need to understand how it's currently working, i.e. the use of the global state is confusing me a lot - I'm sure there's a reason for it but I don't know why we're storing and accessing the event in a global manner. Is there chance for some guidance or rather I should stay put not to generate more work for you than needed?
https://github.com/aws-powertools/powertools-lambda-python/blob/c3e36de310208d678d11951410b00d7511cc7158/aws_lambda_powertools/event_handler/api_gateway.py#L1814-L1815
Thank you @leandrodamascena, what a nice approach for a random idea like that. I would be happy to put in some work into this. Maybe this would not be as replacement for the existing resolvers but something additional? If you would like I could draft something but I would need to understand how it's currently working
We're truly appreciate the possibility of you working on a draft for how we can incorporate support for Async in our resolvers and make it work.
i.e. the use of the global state is confusing me a lot - I'm sure there's a reason for it but I don't know why we're storing and accessing the event in a
globalmanner.
This code is a little older, probably from when we created the Event Handler utility, but it is still in use. Payloads are different when working with Event Handler, and we store the event and context in the BaseRouter class because we need to access particular fields depending on the type of Resolver, i.e:
ALB - https://github.com/aws-powertools/powertools-lambda-python/blob/develop/tests/events/albEvent.json VPC Lattice - https://github.com/aws-powertools/powertools-lambda-python/blob/develop/tests/events/vpcLatticeEvent.json API Gateway REST - https://github.com/aws-powertools/powertools-lambda-python/blob/develop/tests/events/apiGatewayProxyEvent.json
All of our resolvers inherit from ApiGatewayResolver and ApiGatewayResolver inherits from BaseRouter (the Router class too), so we store this to be easily accessible globally across all Resolvers and dependencies.
Does this explanation cover what you need, or would you like more information? Feel free to ask if you need further clarification!
Is there chance for some guidance or rather I should stay put not to generate more work for you than needed?
Of course, there is! Please feel free to share any questions or blocks you have, and we can collaborate to find the best way to move forward.
Thank you for taking the time to collaborate on this matter! :star2:
I don't know if this is entirely correct but I'm solving it this way:
from functools import wraps
import asyncio
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.event_handler import APIGatewayResolver
tracer = Tracer()
app = ApiGatewayResolver()
def wrap_async(func):
@wraps(func)
def wrapper(*args, **kwargs):
return asyncio.run(func(*args,**kwargs))
return wrapper
@tracer.capture_method
@app.get("/hello")
@wrap_async
async def hello():
return { "message": "world" }
@tracer.capture_lambda_handler
def lambda_handler(event, context):
return app.resolve(event, context)
Hey @sykire,
I tried your decorator and it indeed works by leveraging asyncio.run to manage the event loop for each synchronous call to our endpoints. However, this approach seems to counteract the benefits of asynchronous programming due to the significant performance overhead introduced, which I'm yet to find out why (im using aioboto3).
I also wanted to discuss why incorporating async programming is important, especially in the context of API Gateway. In many projects, I've implemented the API Composition pattern, where the API Gateway is responsible for querying multiple services and aggregating their responses. It's very common API Gateways are responsible for doing this. This model inherently benefits from asynchronous code, as it allows for concurrent calls to multiple services, enhancing efficiency and reducing total response time from the server.
For instance, if my Lambda function needs to query 5 different AWS services, firing all these requests simultaneously (assuming independence among them) is far more efficient than making sequential calls. Given that Lambda functions already have latency issues by nature, adding synchronous operations only worsens the response time.
Moreover, I encountered compatibility issues with the X-Ray SDK when using aioboto3 for making concurrent AWS service calls via asyncio.gather. This resulted in an exception: Already ended segment and subsegment cannot be modified. (https://docs.powertools.aws.dev/lambda/python/latest/core/tracer/#concurrent-asynchronous-functions))
Consequently, I was unable to receive a successful response or track the trace for this case.
By not using asyncio.gather and just making a single asynchronous AWS call didn’t improve the situation either. Surprisingly, endpoint response times increased significantly, from approximately 800 ms to 1700 ms, when querying DynamoDB asynchronously. This suggests that the use of asyncio.run could be introducing an overhead, but I can't confirm by how much. I'll do more tests. A difference of 800ms is too much. I'll update as soon as I have more results.
After adding some debug time logs and tests I can confirm that doing asyncio.run does not add a significant overhead. I estimate the overhead of that is 0.0005 seconds based on my tests.
However, notice a 800ms gap in every request with my new async code, so I think it has to do with how the resources are being reused for subsequent calls and how I'm using the aioboto3 library.
I found this thread: https://stackoverflow.com/questions/60455830/can-you-have-an-async-handler-in-lambda-python-3-6 in which it's explained :
Please note that asyncio.run (introduced in Python 3.7) is not a proper way to call an async handler in AWS Lambda execution environment since Lambda tries to reuse the execution context for subsequent invocations.
And he also suggests adding the async boto clients at the top of the module (I was creating them in the function every time):
# To reduce execution time for subsequent invocations,
# open a reusable resource in a global scope
dynamodb = aioboto3.Session().resource('dynamodb')
I'm going to give this a try and share results. So new decorator to try would be:
def run_in_asyncio_event_loop(func):
"""
This decorator is used to run a function in the asyncio event loop.
This allows the Lambda function to run async code and reuse the same event loop between
executions, as well as resources defined in the global scope such as boto3 clients.
"""
@wraps(func)
def async_to_sync_wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(func(*args, **kwargs))
return async_to_sync_wrapper
Ok, after my tests I can confirm that doing the above reduces the latency in subsequent calls effectively reusing the event loop + boto3 clients.
My only and big issue right now, is that when I use asyncio.gather I get the infamous exception reported 5 years ago here: https://github.com/aws/aws-xray-sdk-python/issues/164
I followed the docs here https://docs.powertools.aws.dev/lambda/python/2.41.0/core/tracer/#concurrent-asynchronous-functions but it's not working. I've tried almost every possible combination and reordering the decorators.
My code current looks like this:
@app.get("/<executionArn>")
@run_in_asyncio_event_loop
@tracer.capture_method
async def get_url(
executionArn: str,
) ->
...
asyncio.gather(....) # some coroutines here
Then, as explained in the docs, each coroutine uses async with tracer.provider.in_subsegment_async - But still getting the exception
@heitorlessa tagging you in hopes you might be able to help us, since I saw you contributed to this part a lot