Support for maximum concurrency on SQS triggered functions
In January 2023. AWS supported adding maximum concurrency on SQS lambda trigger. Details can be found here https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/ tldr: when SQS trigger has maximum concurrency set it will hold back messages from visibility after the max lambda concurrency specified is reached thus avoiding possible known DLQ behaviour.
Chalice supporting this would be really beneficial for a lot of flows with queues that are processing large batches of SQS messages on on_sqs_message decorator
eg.
@app.on_sqs_message(queue='my-queue', batch_size=1, max_concurrency=20)
def handle_sqs_message(event):
This would be a really useful option to a project I am currently working on
This would be a really useful option to a project I am currently working on
https://github.com/aws/chalice/issues/2029#issuecomment-1499833574
I faced the same issue. Here is a workaround if it can help someone.
- It applies only when deploying with
chalice package --pkg-format terraform. Run the python script after the chalice package (for example in a CI/CD pipeline). - It expects
reserved_concurrencyto be set on some lambda in config.json between 2 and 1000
import shutil
import json
CHALICE_TF_PATH = 'chalice.tf.json'
# Chalice does not currently support maximum_concurrency and function_response_types
# in the annotations that generate the event source mapping.
# * maximum_concurrency is set to lambda's reserved_concurrent_executions
# (see https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/)
# * function_response_types is set to ReportBatchItemFailures when batch_size is superior than 1.
shutil.copy2(CHALICE_TF_PATH, f'{CHALICE_TF_PATH}.before_tweak')
with open(CHALICE_TF_PATH) as f_input:
chalice_tf: dict = json.load(f_input)
lambda_resources: dict[str, dict] = chalice_tf['resource'].get('aws_lambda_function', {})
esm_resources: dict[str, dict] = chalice_tf['resource']['aws_lambda_event_source_mapping']
for esm_name, esm_resource in esm_resources.items():
print(f'tweak {esm_name} in {CHALICE_TF_PATH}')
lambda_resource: dict = lambda_resources.get(esm_name.split('-')[0], {})
reserved_concurrent_executions: int = lambda_resource.get('reserved_concurrent_executions', 0)
if reserved_concurrent_executions:
esm_resource['scaling_config'] = {'maximum_concurrency': reserved_concurrent_executions}
if esm_resource.get('batch_size', 0) > 1:
esm_resource['function_response_types'] = ['ReportBatchItemFailures']
with open(CHALICE_TF_PATH, 'w') as f_output:
json.dump(chalice_tf, f_output, indent=2)
print(f'{CHALICE_TF_PATH} successfully tweaked')
This is one of a few features on the fork of chalice we are using. https://pypi.org/project/chalice-nuclei-ai/