azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

Add transactional batch support for Cosmos DB Spark connector

Open TheovanKraay opened this issue 2 months ago • 0 comments

Description

This PR adds transactional batch operation support to the Azure Cosmos DB Spark 3 connector, enabling atomic multi-document upsert operations within a single partition. The implementation leverages the standard bulk ingestion pipeline with a dedicated TransactionalBulkWriter/TransactionalBulkExecutor and configuration-based activation that ensures all-or-nothing execution semantics.

Key Features

  • Atomic Operations: All operations in a batch succeed or fail together (ACID guarantees via Cosmos DB transactional batch API)
  • Standard Bulk Pipeline: Reuses existing bulk write infrastructure with transactional semantics (no separate API endpoints)
  • ItemOverwrite Strategy: Uses ItemWriteStrategy.ItemOverwrite (upsert operations)
  • Configuration-Based: Enable via spark.cosmos.write.bulk.transactional=true configuration property
  • Hierarchical Partition Keys: Full support for 1-3 level hierarchical partition keys
  • 100-Operation Limit Enforcement: Automatically validates and enforces Cosmos DB's 100 operations per partition key limit
  • Automatic Partition Key Buffering: Groups consecutive operations by partition key and flushes batches on partition key changes
  • Spark 3.5 Optimization: Automatic data distribution and ordering via RequiresDistributionAndOrdering interface for optimal batching performance
  • Backward Compatibility: Works on Spark 3.3/3.4 (manual sorting recommended for best performance) and Spark 3.5+ (automatic sorting)
  • Simple Configuration: Enable via spark.cosmos.write.bulk.transactional=true

Usage Examples

Configuration

Transactional batch mode is enabled using the spark.cosmos.write.bulk.transactional configuration property:

# Enable transactional batch via configuration
writeConfig = {
    "spark.cosmos.write.bulk.transactional": "true",
    "spark.cosmos.write.bulk.enabled": "true",  # Bulk mode must be enabled
    # ... other cosmos config
}
df.write.format("cosmos.oltp").options(**writeConfig).save()

Example 1: Basic Upsert Operations

All operations in transactional batch mode use the ItemOverwrite write strategy (upsert):

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema for transactional batch writes
schema = StructType([
    StructField("id", StringType(), False),
    StructField("pk", StringType(), False),
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False)
])

# Create DataFrame - all operations will be upserts
items = [
    ("item1", "partition1", "Alice", 30),
    ("item2", "partition1", "Bob", 25),
    ("item3", "partition1", "Charlie", 35)
]
df = spark.createDataFrame(items, schema)

# Execute transactional batch - all operations succeed or fail together
df.write \
    .format("cosmos.oltp") \
    .option("spark.cosmos.write.bulk.transactional", "true") \
    .option("spark.cosmos.write.bulk.enabled", "true") \
    .options(**cosmosConfig) \
    .save()

Note: For Spark 3.3/3.4 users, manually sort data by partition key for optimal performance:

# Recommended for Spark 3.3/3.4 (automatic in Spark 3.5+)
df.repartition("pk").sortWithinPartitions("pk").write \
    .format("cosmos.oltp") \
    .option("spark.cosmos.write.bulk.transactional", "true") \
    .option("spark.cosmos.write.bulk.enabled", "true") \
    .options(**cosmosConfig) \
    .save()

Example 2: Financial Instrument Temporal Versioning (Hierarchical Partition Keys)

This example demonstrates hierarchical partition keys for temporal versioning of financial instruments. The partition key consists of two levels: PermId (instrument identifier) and SourceId (data source):

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema with hierarchical partition key: PermId (level 1) + SourceId (level 2)
schema = StructType([
    StructField("id", StringType(), False),
    StructField("PermId", StringType(), False),      # Partition key level 1
    StructField("SourceId", StringType(), False),    # Partition key level 2
    StructField("ValidFrom", TimestampType(), False),
    StructField("ValidTo", TimestampType(), True),
    StructField("Price", DoubleType(), False),
    StructField("Currency", StringType(), False)
])

# Temporal update: close old record and create new version atomically
# Both operations use the same hierarchical partition key [PermId="MSFT", SourceId="Bloomberg"]
operations = [
    # Close the current active record by setting ValidTo
    ("inst-msft-v1", "MSFT", "Bloomberg", "2024-01-01", "2024-12-01", 100.50, "USD"),
    # Create new version with updated price
    ("inst-msft-v2", "MSFT", "Bloomberg", "2024-12-01", None, 105.75, "USD")
]
df = spark.createDataFrame(operations, schema)

# Execute atomic temporal update - both succeed or both fail
df.write \
    .format("cosmos.oltp") \
    .option("spark.cosmos.write.bulk.transactional", "true") \
    .option("spark.cosmos.write.bulk.enabled", "true") \
    .options(**cosmosConfig) \
    .save()

Note: In this example, PermId and SourceId together form the hierarchical partition key (2 levels). All operations in the same batch must share the same partition key values to maintain atomicity. All operations use ItemOverwrite write strategy.

Input DataFrame Schema

Your DataFrame should have flat columns representing document properties:

Column Type Required Description
id String Yes Document identifier
pk (or partition key path) String/Multiple Yes Partition key value(s) - supports hierarchical keys
...additional columns... Any No Document properties (converted to JSON)

Note: For hierarchical partition keys, include all partition key path columns (e.g., PermId, SourceId). All operations use ItemOverwrite write strategy.

Implementation Architecture

Standard Bulk Pipeline Integration

The transactional batch feature integrates seamlessly with the existing bulk write infrastructure:

  1. Configuration Detection (ItemsWriterBuilder):

    • Detects spark.cosmos.write.bulk.transactional=true configuration
    • Automatically instantiates TransactionalBulkWriter instead of BulkWriter
  2. Partition Key Buffering (TransactionalBulkWriter):

    • Receives operations from Spark partitions
    • Emits operations to TransactionalBulkExecutor reactor pipeline
  3. Batch Grouping (TransactionalBulkExecutor):

    • Groups consecutive operations by partition key using bufferUntil() operator
    • Flushes batch when partition key changes or 100-operation limit reached
    • Creates CosmosBatch with upsertItemOperation() calls (ItemOverwrite strategy)
  4. Atomic Execution:

    • Sends batch to Cosmos DB via executeBatchRequest() API with atomic semantics
    • All operations succeed or fail together (no partial success)

Spark Version Compatibility

Spark Version Status Distribution/Ordering Performance
Spark 3.3 ✅ Supported Manual (user must sort) Good (with manual sorting)
Spark 3.4 ✅ Supported Manual (user must sort) Good (with manual sorting)
Spark 3.5+ ✅ Supported Automatic via RequiresDistributionAndOrdering Optimal (automatic sorting)

Why it works on all Spark versions:

  • The TransactionalBulkExecutor performs partition key buffering at the writer level, independent of Spark's distribution/ordering
  • Spark 3.3/3.4: Feature works correctly, but users should manually sort data by partition key for optimal batching
  • Spark 3.5+: RequiresDistributionAndOrdering interface automatically instructs Spark to repartition and sort data by partition key columns, ensuring consecutive operations share the same partition key for maximum batch efficiency

Performance Impact:

  • Without sorting (Spark 3.3/3.4 without manual sort): Many small batches (1-2 operations each)
  • With sorting (Spark 3.3/3.4 with manual sort OR Spark 3.5+ automatic): Optimal batches (up to 100 operations each)
  • Recommendation: Always sort by partition key on Spark 3.3/3.4 for best performance

Constraints

  • ItemOverwrite Only: Only ItemWriteStrategy.ItemOverwrite (upsert) is supported in transactional batch mode
  • Same Partition Key: All operations in a batch must target the same partition key value(s)
  • 100 Operation Limit: Batches cannot exceed 100 operations per partition key (enforced with clear error messages)
  • Atomicity: All operations succeed or fail together within each batch (no partial success)
  • 2MB Size Limit: Total batch payload cannot exceed 2MB
  • Hierarchical Keys: Supports up to 3-level hierarchical partition keys
  • Bulk Write Required: Must have spark.cosmos.write.bulk.enabled=true (enabled by default)

Error Handling

Transactional batch operations follow all-or-nothing semantics:

# Any failure in the batch causes all operations to rollback
try:
    df.write \
        .format("cosmos.oltp") \
        .option("spark.cosmos.write.bulk.transactional", "true") \
        .option("spark.cosmos.write.bulk.enabled", "true") \
        .options(**cosmosConfig) \
        .save()
except Exception as e:
    # Batch failed - no operations were committed
    print(f"Transaction failed: {e}")

Common Error Scenarios:

  • 400 Bad Request: Invalid document structure or exceeds size limits
  • Exceeds 100 operations: Clear validation error before execution
  • Mixed partition keys: Validation error preventing batch creation

Validation Errors (thrown before execution):

  • Missing required id column
  • More than 100 operations for a single partition key

All SDK Contribution checklist:

  • [x] The pull request does not introduce [breaking changes]
  • [ ] CHANGELOG is updated for new features, bug fixes or other significant changes.
  • [x] I have read the contribution guidelines.

General Guidelines and Best Practices

  • [x] Title of the pull request is clear and informative.
  • [x] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • [x] Pull request includes test coverage for the included changes.

TheovanKraay avatar Dec 06 '25 14:12 TheovanKraay