Normalization (Snowflake) doesn't process multiple records with same primary key, same LSN (Postgres CDC source) and same emitted_at timestamp correctly
https://github.com/airbytehq/airbyte/issues/17955
OnCall issue https://github.com/airbytehq/oncall/issues/795
Postgres source v1.0.13 Snowflake destination v0.4.38 Airbyte v0.40.13
A user has reported an issue between Postgres CDC with wal2json => Snowflake where incremental updates with the same timestamp do not appear in final tables. Final tables show the original insert data, but not update.
Steps to reproduce
Airbyte v0.40.14 Source Postgres v1.0.16 Destination Snowflake v0.4.38
also: Airbyte v0.40.10 Source Postgres 1.0.11 Destination Snowflake 0.4.38
- Inserted row with null column
- Updated row immediately, adding value to null column
- Sync to snowflake. Observe that the original insert and then the UPDATE transactions have same timestamp in raw table
- Observe UPDATE transaction does not appear in final table
Script I used for testing and logs below.
Script to reproduce issue
import psycopg2
from psycopg2.sql import NULL
try:
connection = psycopg2.connect(user="postgres",
password="postgres",
host="127.0.0.1",
port="5438",
database="postgres")
cursor = connection.cursor()
postgres_insert_query = """ INSERT INTO wal_table (id, code) VALUES (%s,%s)"""
record_to_insert = ('newrecord', None)
cursor.execute(postgres_insert_query, record_to_insert)
postgres_update_query = """ UPDATE test_table SET code = 'update' WHERE id = 'newrecord'"""
cursor.execute(postgres_update_query)
connection.commit()
count = cursor.rowcount
print(count, "Changes successfully added to table")
except (Exception, psycopg2.Error) as error:
print("Failed to add changes to table", error)
finally:
# closing database connection.
if connection:
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
Airbyte v0.40.14 logs: a421a961_5687_4cdd_94b8_f5888674c82f_logs_150_txt (1).txt
And my Airbyte v0.40.10 attempt logs: a421a961_5687_4cdd_94b8_f5888674c82f_logs_147_txt.txt
Current Behavior
- User is trying to sync a table including a
texttype column that stores json. The column is updated very frequently, with very similar (or identical) timestamps due to use of Transactional Outbox pattern. - Initial sync looks ok:
>select _airbyte_emitted_at, transactionid from transactionlog where id = 220945937;
+-------------------------------+---------------+
| _AIRBYTE_EMITTED_AT | TRANSACTIONID |
|-------------------------------+---------------|
| 2022-10-13 19:14:13.075 +0000 | NULL |
+-------------------------------+---------------+
- INSERT row appears in raw table and final table successfully. No issues there.
- UPDATE to row (usually a large json stored as text, to overwrite the null column) appears in raw table (see that timestamps are nearly identical):
>select _airbyte_emitted_at, _airbyte_data:transactionid from _AIRBYTE_RAW_TRANSACTIONLOG where _airbyte_data:id = 220945937 order by _airbyte_emitted_at;
+-------------------------------+-----------------------------+
| _AIRBYTE_EMITTED_AT | _AIRBYTE_DATA:TRANSACTIONID |
|-------------------------------+-----------------------------|
| 2022-10-13 19:14:13.075 +0000 | null |
| 2022-10-13 19:14:13.075 +0000 | 116771931 |
+-------------------------------+-----------------------------+
- However, the above UPDATE does not appear in final table:
>select _airbyte_emitted_at, transactionid from transactionlog where id = 220945937;
+-------------------------------+---------------+
| _AIRBYTE_EMITTED_AT | TRANSACTIONID |
|-------------------------------+---------------|
| 2022-10-13 19:14:13.075 +0000 | NULL |
+-------------------------------+---------------+
User Logs
On Call Summary
| Date | Start Status | Next Step | End Status |
|---|---|---|---|
| 2022-01-01 | New Issue | ... | Waiting |
Same issue https://github.com/airbytehq/airbyte/issues/19801
Questions to figure out during grooming:
- if the source emits unique
ab_emitted_at(using.now()per record instead of the time when staging file is inserted into_rawtable) - will that fix the issue even if_ab_cdc_updated_atis the same for both records (as it will be if the two operations are part of the same transaction). - Will the above work when we parallelize?
- Will using
.now()instead of batch time forab_emitted_at- will this cause other problems? - If the above approaches do not work - can we add another counter that Debezium will add to each record?
@grishick : do we have an ETA on a solution here?
@mackro-rocky no ETA yet, but we are looking into more normalization improvements soon
@grishick @sh4sh - do we have any updates here? I've been implementing workarounds which aren't ideal!
Closing since Snowflake is on Destinations V2 now and normalization is gone.