chalice icon indicating copy to clipboard operation
chalice copied to clipboard

Support for maximum concurrency on SQS triggered functions

Open ivandjuricic opened this issue 2 years ago • 4 comments

In January 2023. AWS supported adding maximum concurrency on SQS lambda trigger. Details can be found here https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/ tldr: when SQS trigger has maximum concurrency set it will hold back messages from visibility after the max lambda concurrency specified is reached thus avoiding possible known DLQ behaviour.

Chalice supporting this would be really beneficial for a lot of flows with queues that are processing large batches of SQS messages on on_sqs_message decorator

eg.

@app.on_sqs_message(queue='my-queue', batch_size=1, max_concurrency=20)
def handle_sqs_message(event):

ivandjuricic avatar Mar 30 '23 15:03 ivandjuricic

This would be a really useful option to a project I am currently working on

dorukhan-ti avatar Apr 06 '23 23:04 dorukhan-ti

This would be a really useful option to a project I am currently working on

https://github.com/aws/chalice/issues/2029#issuecomment-1499833574

dorukhan-ti avatar Apr 07 '23 01:04 dorukhan-ti

I faced the same issue. Here is a workaround if it can help someone.

  • It applies only when deploying with chalice package --pkg-format terraform. Run the python script after the chalice package (for example in a CI/CD pipeline).
  • It expects reserved_concurrency to be set on some lambda in config.json between 2 and 1000
import shutil
import json

CHALICE_TF_PATH = 'chalice.tf.json'

# Chalice does not currently support maximum_concurrency and function_response_types
# in the annotations that generate the event source mapping.
# * maximum_concurrency is set to lambda's reserved_concurrent_executions
#   (see https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/)
# * function_response_types is set to ReportBatchItemFailures when batch_size is superior than 1.

shutil.copy2(CHALICE_TF_PATH, f'{CHALICE_TF_PATH}.before_tweak')

with open(CHALICE_TF_PATH) as f_input:
    chalice_tf: dict = json.load(f_input)
    lambda_resources: dict[str, dict] = chalice_tf['resource'].get('aws_lambda_function', {})
    esm_resources: dict[str, dict] = chalice_tf['resource']['aws_lambda_event_source_mapping']

    for esm_name, esm_resource in esm_resources.items():
        print(f'tweak {esm_name} in {CHALICE_TF_PATH}')

        lambda_resource: dict = lambda_resources.get(esm_name.split('-')[0], {})
        reserved_concurrent_executions: int = lambda_resource.get('reserved_concurrent_executions', 0)
        if reserved_concurrent_executions:
            esm_resource['scaling_config'] = {'maximum_concurrency': reserved_concurrent_executions}

        if esm_resource.get('batch_size', 0) > 1:
            esm_resource['function_response_types'] = ['ReportBatchItemFailures']

with open(CHALICE_TF_PATH, 'w') as f_output:
    json.dump(chalice_tf, f_output, indent=2)

print(f'{CHALICE_TF_PATH} successfully tweaked')

Ecitperbo avatar Jan 19 '24 16:01 Ecitperbo

This is one of a few features on the fork of chalice we are using. https://pypi.org/project/chalice-nuclei-ai/

henryivesjones avatar Apr 19 '24 14:04 henryivesjones