Feature request: DynamoDB Helper Utilities
Use case
When working with DynamoDB, depending on the action being performed, there quality of life improvements that can be made to simplify the overall experience. For example:
- When calling
GetItem, automatically populateExpressionAttributeNamesfrom an list of attributes - When calling
QuerywithLimit, automatically handle paginating withLastEvaluatedKeyuntil the desired number of items are returned- This could also include methods for making defining
KeyConditioneasier and populatingExpressionAttributeNamesandExpressionAttributeValues.
- This could also include methods for making defining
- When calling
BatchGetItems, automatically retryUnprocessedKeysusing exponential backoff with jitter. - Automatically serialized and deserialize DynamoDB objects into Python objects
Solution/User Experience
A helper class that implements some of these methods, such as:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random
import time
from typing import Dict, Any, List, TYPE_CHECKING, Generator, Tuple
from aws_lambda_powertools import Logger
import boto3
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.config import Config
if TYPE_CHECKING:
from mypy_boto3_dynamodb import DynamoDBClient
logger = Logger(child=True)
__all__ = ["DynamoDBHelpers", "TransactionWriter"]
class TransactionWriter:
"""
Automatically handle building a transaction to DynamoDB.
"""
def __init__(self, table_name: str, client: "DynamoDBClient", flush_amount: int = 100) -> None:
self._table_name = table_name
self._client = client
self._flush_amount = flush_amount
self._items_buffer: List[Dict[str, Any]] = []
def put_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Put": item})
def update_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Update": item})
def delete_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Delete": item})
def check_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"ConditionCheck": item})
def _add_request_and_process(self, request: Dict[str, Any]) -> None:
self._items_buffer.append(request)
self._flush_if_needed()
def _flush_if_needed(self) -> None:
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[: self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount :]
self._client.transact_write_items(TransactItems=items_to_send)
logger.debug(
f"Transaction write sent {len(items_to_send)}, unprocessed: {len(self._items_buffer)}"
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
class DynamoDBHelpers:
MAX_BATCH_GET_SIZE = 100
MAX_TRANSACTION_WRITE_SIZE = 100
# Maximum number of retries
MAX_RETRIES = 15
# Minimum sleep time
MIN_SLEEP_TIME = 1e-2
_deserializer = TypeDeserializer()
_serializer = TypeSerializer()
_client = None
def __init__(self, table_name: str, region: str, session: boto3.Session = None) -> None:
if not session:
session = boto3._get_default_session()
config = Config(
retries={
"max_attempts": self.MAX_RETRIES,
"mode": "standard",
}
)
self._client: "DynamoDBClient" = session.client(
"dynamodb", region_name=region, config=config
)
self._table_name = table_name
@property
def client(self):
return self._client
def transaction_writer(self) -> TransactionWriter:
return TransactionWriter(self._table_name, self._client, self.MAX_TRANSACTION_WRITE_SIZE)
def paginated_query(
self, params: Dict[str, Any], page_size: int, acc: List[Dict[str, Any]] = None
) -> Tuple[List, bool]:
"""
@see https://dev.to/andyrichardsonn/graphql-pagination-with-dynamodb-putting-it-together-20mn
"""
if acc is None:
acc = []
logger.debug("paginated_query", params=params, page_size=page_size)
remaining = page_size - len(acc)
result = self._client.query(**params)
new_items = result.get("Items", [])
new_acc = [*acc, *(new_items[0:remaining])]
cursor = result.get("LastEvaluatedKey")
if not cursor:
return new_acc, len(new_items) > remaining
if len(new_acc) < page_size or len(new_items) <= remaining:
return self.paginated_query(params | {"ExclusiveStartKey": cursor}, page_size, new_acc)
return new_acc, True
def get_items(
self, keys: List[Dict[str, Any]], attributes: List[str] = None, in_transaction: bool = False
) -> Generator[Dict[str, Any], None, None]:
projection_attrs = {}
if attributes:
names = {}
placeholders = []
for idx, attribute in enumerate(attributes):
placeholder = f"#a{idx}"
names[placeholder] = attribute
placeholders.append(placeholder)
projection_attrs["ExpressionAttributeNames"] = names
projection_attrs["ProjectionExpression"] = ",".join(placeholders)
for key_chunk in self._batch_keys(keys):
if in_transaction:
items = self._get_items_transaction(key_chunk, projection_attrs)
else:
items = self._get_items_batch(key_chunk, projection_attrs)
for item in items:
yield item
def _get_items_batch(
self, keys: List[Dict[str, Any]], attributes: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""
Gets a batch of items from Amazon DynamoDB in a batch from a single table.
"""
if not keys:
return []
num_keys = len(keys)
if num_keys > self.MAX_BATCH_GET_SIZE:
raise Exception(
f"{num_keys} exceeds maximum batch get size of {self.MAX_BATCH_GET_SIZE}"
)
request_items = {
self._table_name: {
"Keys": self._serialize(keys),
}
}
if attributes:
request_items[self._table_name] |= attributes
response = self._client.batch_get_item(RequestItems=request_items)
items = response.get("Responses", {}).get(self._table_name, [])
results: List[Dict[str, Any]] = [self._deserialize(item) for item in items]
num_retries = 0
while response.get("UnprocessedKeys", {}).get(self._table_name, None) is not None:
num_keys = len(response.get("UnprocessedKeys", {}).get(self._table_name, []))
num_retries += 1
if num_retries > self.MAX_RETRIES:
num_retries = random.randint(1, self.MAX_RETRIES)
sleep_time = self.MIN_SLEEP_TIME * random.randint(1, 2**num_retries)
logger.debug(
f"Re-fetching {num_keys} keys, retry {num_retries} of {self.MAX_RETRIES}, sleeping for {sleep_time} seconds"
)
time.sleep(sleep_time)
response = self._client.batch_get_item(RequestItems=response["UnprocessedKeys"])
items = response.get("Responses", {}).get(self._table_name, [])
results.extend([self._deserialize(item) for item in items])
return results
def _get_items_transaction(
self, keys: List[Dict[str, Any]], attributes: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""
Gets a batch of items from Amazon DynamoDB in a transaction.
"""
if not keys:
return []
num_keys = len(keys)
if num_keys > self.MAX_BATCH_GET_SIZE:
raise Exception(
f"{num_keys} exceeds maximum batch get size of {self.MAX_BATCH_GET_SIZE}"
)
items = []
for key in keys:
item = {
"Get": {
"Key": self._serialize(key),
"TableName": self._table_name,
}
}
if attributes:
item["Get"] |= attributes
items.append(item)
response = self._client.transact_get_items(TransactItems=items)
items = response.get("Responses", [])
return [self._deserialize(item["Item"]) for item in items]
@classmethod
def _batch_keys(cls, keys: List[Dict[str, Any]]) -> Generator[List[Dict[str, Any]], None, None]:
end = len(keys)
for index in range(0, end, cls.MAX_BATCH_GET_SIZE):
yield keys[index : min(index + cls.MAX_BATCH_GET_SIZE, end)]
@classmethod
def _deserialize(cls, item: Any) -> Any:
if not item:
return item
if isinstance(item, dict) and "M" not in item:
item = {"M": item}
return cls._deserializer.deserialize(item)
@classmethod
def _serialize(cls, obj: Any) -> Dict[str, Any]:
result = cls._serializer.serialize(obj)
if "M" in result:
result = result["M"]
return result
Alternative solutions
PynamoDB is one alternative that has its own model structure. I'm implementing a single-table design and wanted more control over the items being inserted into the table.
What does a good UX look like?
Users shouldn't be concerned with how DynamoDB internally representing typing, so all outside->in and inside->out data conversations should be ran through the DynamoDB TypeSerializer and TypeDeserializer libraries. Fetching individual attributes can also be simplified to List[str] and then internally we build the ProjectionExpression and populate ExpressionAttributeNames.
Acknowledgment
- [X] This feature request meets Lambda Powertools Tenets
- [X] Should this be considered in other Lambda Powertools languages? i.e. Java, TypeScript
hey @jplock thanks a lot for the suggestion, would you have a sample on what you and/or customers typically have to do themselves? perhaps a RFC fits better here?
I need some time to digest the proposed UX, it's almost like we need to break it down to make it easily parse Current Boilerplate vs How it could look like.
On TypeDeserializer, we've updated the DynamoDB Stream Event Source Data Class in V2 to use exactly what Boto3 uses it, including nested types: https://awslabs.github.io/aws-lambda-powertools-python/2.10.0/utilities/data_classes/#dynamodb-streams. Not exactly the same, as you're after "Single Table" and from a different angle.
Adding a label to get more customer feedback. I could be wrong, but this might attract more feedback if the light is primarily on customers doing "Single Table" design, which brings other challenges I'm not entirely comfortable to suggest customers to (it's quite contextual).
Thank you!!!
I agree that there are many quality of life improvement that can be made to wrap around the existing boto3 DynamoDB client. Which drove me to write a similar solution for my own team's internal use, drawing inspiration from SqlAlchemy and try to retain the flexibility of using a single table design together with Pydantic.
To illustrate:
class Customer(BaseModel):
name: str
db = DynamoDB()
stmt = Get(pk="C#", sk_condition=Ops.begin_with, sk="CUSTOMER#"))
.target("GSI1")
.filter_by("name", Ops.eq, "John")
.limit(100)
customer: Customer = db.exec(stmt).parse(Customer)
As my team have very specific use cases, it was not general purpose enough to be outsource directly. But I am considering to create a separate open source project to further iterate on the improvements that I found, I came here to check if there is already something that is in the pipeline for this project.
However, I think it would be really nice to have Lambda Powertool for Python to bundle this as a utility with the help of the current developers, users and the community of this project to provide feedback and improvements rather than myself, this helper might see better light.
hey @Stewart86 this reminds me of SQLModel in a way. I do agree that we could do something in this area despite not wanting to earlier in the days - we keep seeing much boilerplate and suboptimal logic as customers try to handle multiple scenarios themselves.
If you're up to, create a RFC using Pydantic and we can help shape it, ask for a POC later on, and get to a good UX that is easy to follow for any persona (e.g., developer, data engineer, Ops, etc.).
exciting!
To add additional context, I’m looking for easier ways to construct DynamoDB operations that doesn’t involve defining models or creating an ORM.
It would be neat to have a layer that can translate Pydantic models and queries against a single DynamoDB table. But I wouldn’t want it to turn into Pynamo.
Yup @jplock that's aligned to what we've been preaching for a while. We'd be happy with a query builder, expose the raw query to aid understanding and testing, and value add enhancements like saving/retrieving large payloads in S3 (>400K).
PynamoDB is the option we'd recommend for anyone looking for an actual ORM.
Hi I've been quite time poor for a while but I did start a python port of the node dynamodb toolbox by Jeremy daly. It's not an Orm but provides a whole lot of convenience around single table design and makes loading/saving data simpler IMO. For a lot of use cases I find pyhantic a little heavy and quite rigid in the way classes are constructed (maybe it's changed?). The dynamodb toolbox is a drop-in replacement for the aws-sdk from a response perspective. I have also recently found another node library that I am investigating for a port created by sense deep.
I agree the boto3 API is awfulaed something needs to be done here. For ORM interface I would also suggest Pyhantic.
hey @arjanschaaf, as you're our trusted DynamoDB Specialist, feel free to create a RFC about this topic to help expedite it, or feel welcome to contribute to the RFC once anyone creates it.
What I'm interested in from a RFC point of view:
- Initial focus for a query builder (not ORM)
- How do we support data validation, data serialization, and data deserialization
- How do we reuse existing DynamoDB knowledge and make it easier to search stuff on StackOverflow/AWS Docs?
- A mechanism to easily access the raw query to demystify things and make testing easier
- Known hazards we need to be careful, e.g., filter, limit, pagination, transactions, transactions error handling, etc.
another inspiration: https://docs.aws.amazon.com/sdk-for-ruby/aws-record/api/Aws/Record.html
I just found https://github.com/nayaverdier/dyntastic. It's missing transaction support but maybe the author would be willing to add it.
Hello, I am coming from the dynastic issue mentioning this one. I wanted to know if you are going with dynastic with this issue, as we want to adopt the library and the missing transaction support is critical for us. The feature is already there, but the developer of the project is waiting for your input to release the version with support for it. Thank you! :smiley_cat:
hey @photonbit, as of now we're missing a RFC that can answer these: https://github.com/aws-powertools/powertools-lambda-python/issues/2053#issuecomment-1531170523
There we can discuss whether relying on dynastic is the right choice* with a thin wrapper, or build one inspired by the work dynastic did. As of now, we're focused on these roadmap items for major features. We'd rely on a RFC discussion, community input and contribution if we want to get this out this year.
My main concern with DynamoDB is making sure we keep it light, don't build an ORM, and set customers to success by keeping them out of trouble when using limits/filters and the like.
*right choice means investigating whether it's maintained, how stable for production is (thousands of customers, compliance, etc.), how easy it is to contribute upstream, whether it matches RFC, our tenets, and community expectations on UX, etc.
@heitorlessa sorry for overseeing the conversation and thank you very much for the detailed response, it is a delight to see a project with proper information and roadmap structures :smile_cat: