airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

Normalization (Snowflake) doesn't process multiple records with same primary key, same LSN (Postgres CDC source) and same emitted_at timestamp correctly

Open etsybaev opened this issue 3 years ago • 4 comments

https://github.com/airbytehq/airbyte/issues/17955

OnCall issue https://github.com/airbytehq/oncall/issues/795

Postgres source v1.0.13 Snowflake destination v0.4.38 Airbyte v0.40.13

A user has reported an issue between Postgres CDC with wal2json => Snowflake where incremental updates with the same timestamp do not appear in final tables. Final tables show the original insert data, but not update.

Steps to reproduce

Airbyte v0.40.14 Source Postgres v1.0.16 Destination Snowflake v0.4.38

also: Airbyte v0.40.10 Source Postgres 1.0.11 Destination Snowflake 0.4.38

  1. Inserted row with null column
  2. Updated row immediately, adding value to null column
  3. Sync to snowflake. Observe that the original insert and then the UPDATE transactions have same timestamp in raw table
  4. Observe UPDATE transaction does not appear in final table

Script I used for testing and logs below.

Script to reproduce issue
import psycopg2
from psycopg2.sql import NULL

try:
    connection = psycopg2.connect(user="postgres",
                                  password="postgres",
                                  host="127.0.0.1",
                                  port="5438",
                                  database="postgres")
    cursor = connection.cursor()

    postgres_insert_query = """ INSERT INTO wal_table (id, code) VALUES (%s,%s)"""
    record_to_insert = ('newrecord', None)
    cursor.execute(postgres_insert_query, record_to_insert)

    postgres_update_query = """ UPDATE test_table SET code = 'update' WHERE id = 'newrecord'"""
    cursor.execute(postgres_update_query)

    connection.commit()
    count = cursor.rowcount
    print(count, "Changes successfully added to table")

except (Exception, psycopg2.Error) as error:
    print("Failed to add changes to table", error)

finally:
    # closing database connection.
    if connection:
        cursor.close()
        connection.close()
        print("PostgreSQL connection is closed")

Airbyte v0.40.14 logs: a421a961_5687_4cdd_94b8_f5888674c82f_logs_150_txt (1).txt

And my Airbyte v0.40.10 attempt logs: a421a961_5687_4cdd_94b8_f5888674c82f_logs_147_txt.txt

Current Behavior

  1. User is trying to sync a table including a text type column that stores json. The column is updated very frequently, with very similar (or identical) timestamps due to use of Transactional Outbox pattern.
  2. Initial sync looks ok:
>select _airbyte_emitted_at, transactionid from transactionlog where id = 220945937;
+-------------------------------+---------------+
| _AIRBYTE_EMITTED_AT           | TRANSACTIONID |
|-------------------------------+---------------|
| 2022-10-13 19:14:13.075 +0000 |          NULL |
+-------------------------------+---------------+
  1. INSERT row appears in raw table and final table successfully. No issues there.
  2. UPDATE to row (usually a large json stored as text, to overwrite the null column) appears in raw table (see that timestamps are nearly identical):
>select _airbyte_emitted_at, _airbyte_data:transactionid from _AIRBYTE_RAW_TRANSACTIONLOG where _airbyte_data:id = 220945937 order by _airbyte_emitted_at;
+-------------------------------+-----------------------------+
| _AIRBYTE_EMITTED_AT           | _AIRBYTE_DATA:TRANSACTIONID |
|-------------------------------+-----------------------------|
| 2022-10-13 19:14:13.075 +0000 | null                        |
| 2022-10-13 19:14:13.075 +0000 | 116771931                   |
+-------------------------------+-----------------------------+
  1. However, the above UPDATE does not appear in final table:
>select _airbyte_emitted_at, transactionid from transactionlog where id = 220945937;
+-------------------------------+---------------+
| _AIRBYTE_EMITTED_AT           | TRANSACTIONID |
|-------------------------------+---------------|
| 2022-10-13 19:14:13.075 +0000 |          NULL |
+-------------------------------+---------------+

User Logs

logs-17955.zip

On Call Summary

Date Start Status Next Step End Status
2022-01-01 New Issue ... Waiting

etsybaev avatar Dec 14 '22 17:12 etsybaev

Same issue https://github.com/airbytehq/airbyte/issues/19801

sh4sh avatar Dec 14 '22 19:12 sh4sh

Questions to figure out during grooming:

  • if the source emits unique ab_emitted_at (using .now() per record instead of the time when staging file is inserted into _raw table) - will that fix the issue even if _ab_cdc_updated_at is the same for both records (as it will be if the two operations are part of the same transaction).
  • Will the above work when we parallelize?
  • Will using .now() instead of batch time for ab_emitted_at - will this cause other problems?
  • If the above approaches do not work - can we add another counter that Debezium will add to each record?

grishick avatar Dec 15 '22 17:12 grishick

@grishick : do we have an ETA on a solution here?

mackro-rocky avatar Jan 27 '23 21:01 mackro-rocky

@mackro-rocky no ETA yet, but we are looking into more normalization improvements soon

grishick avatar Jan 27 '23 21:01 grishick

@grishick @sh4sh - do we have any updates here? I've been implementing workarounds which aren't ideal!

mackro-rocky avatar Mar 09 '23 16:03 mackro-rocky

Closing since Snowflake is on Destinations V2 now and normalization is gone.

cynthiaxyin avatar Nov 22 '23 23:11 cynthiaxyin