ksml icon indicating copy to clipboard operation
ksml copied to clipboard

Silent Integer Overflow in AVRO Schema Evolution

Open KasparMetsa opened this issue 5 months ago • 0 comments

Steps to Reproduce

cd /Users/km/dev/ksml/docs/local-docker-compose-setup-with-sr

Phase 1: Produce Messages with Schema v2 (field as long with large values)

Step 1.1: Configure Schema v2 (sequence field as long)

Edit examples/SensorData.avsc and add a sequence field as long after the timestamp field:

{
  "namespace": "io.axual.ksml.example",
  "doc": "Emulated sensor data with a few additional attributes",
  "name": "SensorData",
  "type": "record",
  "fields": [
    {
      "doc": "The name of the sensor",
      "name": "name",
      "type": "string"
    },
    {
      "doc": "The timestamp of the sensor reading",
      "name": "timestamp",
      "type": "long"
    },
    {
      "doc": "The sequence number of this reading",
      "name": "sequence",
      "type": "long"
    },
    {
      "doc": "The value of the sensor, represented as string",
      "name": "value",
      "type": "string"
    },
    {
      "doc": "The type of the sensor",
      "name": "type",
      "type": {
        "name": "SensorType",
        "type": "enum",
        "doc": "The type of a sensor",
        "symbols": ["AREA", "HUMIDITY", "LENGTH", "STATE", "TEMPERATURE"]
      }
    },
    {
      "doc": "The unit of measurement",
      "name": "unit",
      "type": "string"
    },
    {
      "doc": "The color of the sensor",
      "name": "color",
      "type": ["null", "string"],
      "default": null
    },
    {
      "doc": "The city of the sensor",
      "name": "city",
      "type": ["null", "string"],
      "default": null
    },
    {
      "doc": "The owner of the sensor",
      "name": "owner",
      "type": ["null", "string"],
      "default": null
    }
  ]
}

Schema v2 has 9 fields with sequence as long (64-bit integer).

Step 1.2: Update Producer to Include sequence Field (as long with large values)

Edit examples/phase1-producer-only.yaml and modify the generator function:

functions:
  generate_sensordata_message:
    type: generator
    globalCode: |
      import time
      import random
      messageCount = 0
    code: |
      global messageCount
      if messageCount >= 10:
        log.info("Producer stopped (count: 10 reached)")
        return None

      key = "sensor" + str(messageCount)
      messageCount += 1

      sensorTypes = ["TEMPERATURE", "HUMIDITY", "LENGTH", "AREA", "STATE"]
      units = ["C", "F", "m", "cm", "Pa"]
      colors = ["red", "blue", "green", None]
      cities = ["Amsterdam", "Rotterdam", "Utrecht", None]
      owners = ["Alice", "Bob", "Charlie", None]

      # Generate large long values that exceed int MAX (2,147,483,647)
      # Using values like 3,000,000,000 + messageCount
      largeSequence = 3000000000 + messageCount

      result = (key, {
        "name": key,
        "timestamp": int(time.time() * 1000),
        "sequence": largeSequence,  # long value > int MAX
        "value": str(round(random.uniform(10.0, 30.0), 2)),
        "type": random.choice(sensorTypes),
        "unit": random.choice(units),
        "color": random.choice(colors),
        "city": random.choice(cities),
        "owner": random.choice(owners)
      })

      log.info("========================================")
      log.info("PHASE 1: Producing message {}/10 with schema v2", messageCount)
      log.info("Key: {}", key)
      log.info("Fields (v2): sequence as LONG")
      log.info("Sequence value: {} (long, EXCEEDS int MAX 2,147,483,647)", largeSequence)
      log.info("========================================")

      time.sleep(2)
    expression: result
    resultType: (string, avro:SensorData)

producers:
  sensordata_producer:
    generator: generate_sensordata_message
    count: 10
    interval: 2s
    to:
      topic: sensor_data_avro
      keyType: string
      valueType: avro:SensorData

CRITICAL: The sequence values (3,000,000,001 to 3,000,000,010) exceed int's maximum value of 2,147,483,647!

Step 1.3: Configure for Phase 1

Edit examples/ksml-runner.yaml:

ksml:
  definitions:
    # PHASE 1: Uncomment this line
    phase1: phase1-producer-only.yaml

    # PHASE 2: Keep this commented
    #phase2: phase2-processor-only.yaml

    # DEFAULT: Comment these out
    #producer: producer.yaml
    #processor: processor.yaml

Step 1.4: Start Docker Compose

docker compose up -d && docker compose logs ksml -f

Step 1.5: Watch Producer Generate 10 Messages

You'll see output like:

========================================
PHASE 1: Producing message 1/10 with schema v2
Key: sensor0
Fields (v2): sequence as LONG
Sequence value: 3000000001 (long, EXCEEDS int MAX 2,147,483,647)
========================================

... (messages 2-9) ...

========================================
PHASE 1: Producing message 10/10 with schema v2
Key: sensor9
Fields (v2): sequence as LONG
Sequence value: 3000000010 (long, EXCEEDS int MAX 2,147,483,647)
========================================

Producer stopped (count: 10 reached)

Time: 20 seconds (2 seconds per message)

Step 1.6: Stop KSML

docker compose stop ksml

IMPORTANT: Stop KSML but keep Kafka and Schema Registry running!

Phase 1 Complete: 10 messages with schema v2 (sequence as long with values > int MAX) are in Kafka


Interlude: Demote Schema (Change long to int)

Step 2.1: Demote sequence Field from long to int

Edit examples/SensorData.avsc and change the sequence field type from long to int:

{
  "namespace": "io.axual.ksml.example",
  "doc": "Emulated sensor data with a few additional attributes",
  "name": "SensorData",
  "type": "record",
  "fields": [
    {
      "doc": "The name of the sensor",
      "name": "name",
      "type": "string"
    },
    {
      "doc": "The timestamp of the sensor reading",
      "name": "timestamp",
      "type": "long"
    },
    {
      "doc": "The sequence number of this reading",
      "name": "sequence",
      "type": "int"
    },
    {
      "doc": "The value of the sensor, represented as string",
      "name": "value",
      "type": "string"
    },
    {
      "doc": "The type of the sensor",
      "name": "type",
      "type": {
        "name": "SensorType",
        "type": "enum",
        "doc": "The type of a sensor",
        "symbols": ["AREA", "HUMIDITY", "LENGTH", "STATE", "TEMPERATURE"]
      }
    },
    {
      "doc": "The unit of measurement",
      "name": "unit",
      "type": "string"
    },
    {
      "doc": "The color of the sensor",
      "name": "color",
      "type": ["null", "string"],
      "default": null
    },
    {
      "doc": "The city of the sensor",
      "name": "city",
      "type": ["null", "string"],
      "default": null
    },
    {
      "doc": "The owner of the sensor",
      "name": "owner",
      "type": ["null", "string"],
      "default": null
    }
  ]
}

CHANGED: "type": "long""type": "int" for the sequence field

Step 2.2: Configure for Phase 2

Edit examples/ksml-runner.yaml:

ksml:
  definitions:
    # PHASE 1: Comment this out
    #phase1: phase1-producer-only.yaml

    # PHASE 2: Uncomment this line
    phase2: phase2-processor-only.yaml

    # DEFAULT: Keep these commented
    #producer: producer.yaml
    #processor: processor.yaml

Schema Change: v2 (sequence as long) → v1 (sequence as int)


Phase 2: Read v2 Data with v1 Schema → FAIL!

Step 3.1: Start KSML

docker compose start ksml && docker compose logs ksml -f

Step 3.2: Expected Behavior - Forward Incompatibility Error

Expected Outcome: AVRO Deserialization Error

Actual Outcome: No error

Messages like these get produced:

{
    "sequence": -1294967286,
    "unit": "cm",
    "owner": {
        "string": "Bob"
    },
    "name": "SENSOR9",
    "type": "TEMPERATURE",
    "timestamp": 1763602592011,
    "city": {
        "string": "Rotterdam"
    },
    "color": {
        "string": "blue"
    },
    "value": "16.51"
}

Proposed fix:

Proposed Fix

Add overflow detection in AvroDataObjectMapper.java:

case INT -> {
    if (value instanceof DataInteger v) return v.value();
    if (value instanceof DataByte v)
        return v.value() != null ? v.value().intValue() : null;
    if (value instanceof DataShort v)
        return v.value() != null ? v.value().intValue() : null;

    // ADD THIS:
    if (value instanceof DataLong v) {
        if (v.value() == null) return null;
        long longVal = v.value();

        if (longVal < Integer.MIN_VALUE || longVal > Integer.MAX_VALUE) {
            throw new DataException(
                "Value " + longVal + " exceeds INT range [" +
                Integer.MIN_VALUE + ", " + Integer.MAX_VALUE + "]. " +
                "Use 'long' type in schema or ensure values fit in INT range."
            );
        }

        return (int) longVal;
    }

    return fromDataObject(value);
}

Apply Similar Fix For:

  • FLOAT: Add DataDouble validation (precision loss detection)
  • SHORT: Add DataInteger/DataLong validation (overflow detection)
  • BYTE: Add DataShort/DataInteger/DataLong validation (overflow detection)

KasparMetsa avatar Nov 20 '25 02:11 KasparMetsa