Add transactional batch support for Cosmos DB Spark connector
Description
This PR adds transactional batch operation support to the Azure Cosmos DB Spark 3 connector, enabling atomic multi-document upsert operations within a single partition. The implementation leverages the standard bulk ingestion pipeline with a dedicated TransactionalBulkWriter/TransactionalBulkExecutor and configuration-based activation that ensures all-or-nothing execution semantics.
Key Features
- Atomic Operations: All operations in a batch succeed or fail together (ACID guarantees via Cosmos DB transactional batch API)
- Standard Bulk Pipeline: Reuses existing bulk write infrastructure with transactional semantics (no separate API endpoints)
-
ItemOverwrite Strategy: Uses
ItemWriteStrategy.ItemOverwrite(upsert operations) -
Configuration-Based: Enable via
spark.cosmos.write.bulk.transactional=trueconfiguration property - Hierarchical Partition Keys: Full support for 1-3 level hierarchical partition keys
- 100-Operation Limit Enforcement: Automatically validates and enforces Cosmos DB's 100 operations per partition key limit
- Automatic Partition Key Buffering: Groups consecutive operations by partition key and flushes batches on partition key changes
-
Spark 3.5 Optimization: Automatic data distribution and ordering via
RequiresDistributionAndOrderinginterface for optimal batching performance - Backward Compatibility: Works on Spark 3.3/3.4 (manual sorting recommended for best performance) and Spark 3.5+ (automatic sorting)
-
Simple Configuration: Enable via
spark.cosmos.write.bulk.transactional=true
Usage Examples
Configuration
Transactional batch mode is enabled using the spark.cosmos.write.bulk.transactional configuration property:
# Enable transactional batch via configuration
writeConfig = {
"spark.cosmos.write.bulk.transactional": "true",
"spark.cosmos.write.bulk.enabled": "true", # Bulk mode must be enabled
# ... other cosmos config
}
df.write.format("cosmos.oltp").options(**writeConfig).save()
Example 1: Basic Upsert Operations
All operations in transactional batch mode use the ItemOverwrite write strategy (upsert):
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define schema for transactional batch writes
schema = StructType([
StructField("id", StringType(), False),
StructField("pk", StringType(), False),
StructField("name", StringType(), False),
StructField("age", IntegerType(), False)
])
# Create DataFrame - all operations will be upserts
items = [
("item1", "partition1", "Alice", 30),
("item2", "partition1", "Bob", 25),
("item3", "partition1", "Charlie", 35)
]
df = spark.createDataFrame(items, schema)
# Execute transactional batch - all operations succeed or fail together
df.write \
.format("cosmos.oltp") \
.option("spark.cosmos.write.bulk.transactional", "true") \
.option("spark.cosmos.write.bulk.enabled", "true") \
.options(**cosmosConfig) \
.save()
Note: For Spark 3.3/3.4 users, manually sort data by partition key for optimal performance:
# Recommended for Spark 3.3/3.4 (automatic in Spark 3.5+)
df.repartition("pk").sortWithinPartitions("pk").write \
.format("cosmos.oltp") \
.option("spark.cosmos.write.bulk.transactional", "true") \
.option("spark.cosmos.write.bulk.enabled", "true") \
.options(**cosmosConfig) \
.save()
Example 2: Financial Instrument Temporal Versioning (Hierarchical Partition Keys)
This example demonstrates hierarchical partition keys for temporal versioning of financial instruments. The partition key consists of two levels: PermId (instrument identifier) and SourceId (data source):
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema with hierarchical partition key: PermId (level 1) + SourceId (level 2)
schema = StructType([
StructField("id", StringType(), False),
StructField("PermId", StringType(), False), # Partition key level 1
StructField("SourceId", StringType(), False), # Partition key level 2
StructField("ValidFrom", TimestampType(), False),
StructField("ValidTo", TimestampType(), True),
StructField("Price", DoubleType(), False),
StructField("Currency", StringType(), False)
])
# Temporal update: close old record and create new version atomically
# Both operations use the same hierarchical partition key [PermId="MSFT", SourceId="Bloomberg"]
operations = [
# Close the current active record by setting ValidTo
("inst-msft-v1", "MSFT", "Bloomberg", "2024-01-01", "2024-12-01", 100.50, "USD"),
# Create new version with updated price
("inst-msft-v2", "MSFT", "Bloomberg", "2024-12-01", None, 105.75, "USD")
]
df = spark.createDataFrame(operations, schema)
# Execute atomic temporal update - both succeed or both fail
df.write \
.format("cosmos.oltp") \
.option("spark.cosmos.write.bulk.transactional", "true") \
.option("spark.cosmos.write.bulk.enabled", "true") \
.options(**cosmosConfig) \
.save()
Note: In this example, PermId and SourceId together form the hierarchical partition key (2 levels). All operations in the same batch must share the same partition key values to maintain atomicity. All operations use ItemOverwrite write strategy.
Input DataFrame Schema
Your DataFrame should have flat columns representing document properties:
| Column | Type | Required | Description |
|---|---|---|---|
| id | String | Yes | Document identifier |
| pk (or partition key path) | String/Multiple | Yes | Partition key value(s) - supports hierarchical keys |
| ...additional columns... | Any | No | Document properties (converted to JSON) |
Note: For hierarchical partition keys, include all partition key path columns (e.g., PermId, SourceId). All operations use ItemOverwrite write strategy.
Implementation Architecture
Standard Bulk Pipeline Integration
The transactional batch feature integrates seamlessly with the existing bulk write infrastructure:
-
Configuration Detection (
ItemsWriterBuilder):- Detects
spark.cosmos.write.bulk.transactional=trueconfiguration - Automatically instantiates
TransactionalBulkWriterinstead ofBulkWriter
- Detects
-
Partition Key Buffering (
TransactionalBulkWriter):- Receives operations from Spark partitions
- Emits operations to
TransactionalBulkExecutorreactor pipeline
-
Batch Grouping (
TransactionalBulkExecutor):- Groups consecutive operations by partition key using
bufferUntil()operator - Flushes batch when partition key changes or 100-operation limit reached
- Creates
CosmosBatchwithupsertItemOperation()calls (ItemOverwrite strategy)
- Groups consecutive operations by partition key using
-
Atomic Execution:
- Sends batch to Cosmos DB via
executeBatchRequest()API with atomic semantics - All operations succeed or fail together (no partial success)
- Sends batch to Cosmos DB via
Spark Version Compatibility
| Spark Version | Status | Distribution/Ordering | Performance |
|---|---|---|---|
| Spark 3.3 | ✅ Supported | Manual (user must sort) | Good (with manual sorting) |
| Spark 3.4 | ✅ Supported | Manual (user must sort) | Good (with manual sorting) |
| Spark 3.5+ | ✅ Supported | Automatic via RequiresDistributionAndOrdering |
Optimal (automatic sorting) |
Why it works on all Spark versions:
- The
TransactionalBulkExecutorperforms partition key buffering at the writer level, independent of Spark's distribution/ordering - Spark 3.3/3.4: Feature works correctly, but users should manually sort data by partition key for optimal batching
- Spark 3.5+:
RequiresDistributionAndOrderinginterface automatically instructs Spark to repartition and sort data by partition key columns, ensuring consecutive operations share the same partition key for maximum batch efficiency
Performance Impact:
- Without sorting (Spark 3.3/3.4 without manual sort): Many small batches (1-2 operations each)
- With sorting (Spark 3.3/3.4 with manual sort OR Spark 3.5+ automatic): Optimal batches (up to 100 operations each)
- Recommendation: Always sort by partition key on Spark 3.3/3.4 for best performance
Constraints
-
ItemOverwrite Only: Only
ItemWriteStrategy.ItemOverwrite(upsert) is supported in transactional batch mode - Same Partition Key: All operations in a batch must target the same partition key value(s)
- 100 Operation Limit: Batches cannot exceed 100 operations per partition key (enforced with clear error messages)
- Atomicity: All operations succeed or fail together within each batch (no partial success)
- 2MB Size Limit: Total batch payload cannot exceed 2MB
- Hierarchical Keys: Supports up to 3-level hierarchical partition keys
-
Bulk Write Required: Must have
spark.cosmos.write.bulk.enabled=true(enabled by default)
Error Handling
Transactional batch operations follow all-or-nothing semantics:
# Any failure in the batch causes all operations to rollback
try:
df.write \
.format("cosmos.oltp") \
.option("spark.cosmos.write.bulk.transactional", "true") \
.option("spark.cosmos.write.bulk.enabled", "true") \
.options(**cosmosConfig) \
.save()
except Exception as e:
# Batch failed - no operations were committed
print(f"Transaction failed: {e}")
Common Error Scenarios:
- 400 Bad Request: Invalid document structure or exceeds size limits
- Exceeds 100 operations: Clear validation error before execution
- Mixed partition keys: Validation error preventing batch creation
Validation Errors (thrown before execution):
- Missing required
idcolumn - More than 100 operations for a single partition key
All SDK Contribution checklist:
- [x] The pull request does not introduce [breaking changes]
- [ ] CHANGELOG is updated for new features, bug fixes or other significant changes.
- [x] I have read the contribution guidelines.
General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.
Testing Guidelines
- [x] Pull request includes test coverage for the included changes.