Implement persons-on-events async migration
Is your feature request related to a problem?
We're nearing completion with ingestion- and query-related changes for persons-on-events project. This ticket outlines the plan for migrating to that schema on cloud and self-hosted by default.
Current state
@yakkomajuri implemented posthog/management/commands/backfill_persons_and_groups_on_events.py, however the script has various issues:
- It runs all queries
ON CLUSTERwhich would cause issues for us on cloud due to backup nodes and other racy behaviors. - The queries are incorrect: Lookups should be prefixed with team_ids and group_type_ids in appropriate dictionaries.
- The queries are incorrect: They do not handle business logic around person/group updates or deletions
- The queries are inconsistent: Two subsequent events might get e.g. invalid person_id or person_properties if the UPDATE query hits them at different times or depending on caching behavior
- The schema has evolved: created_at columns are missing from the current backfill script
- Caching-related work needs to happen: team-by-team backfills took way too long otherwise. Tuning GRANULARITY and making sure we're making use of sorting will also help here.
We're in a reasonably good state on cloud where we have ~60% of disk free to do this migration as needed.
One complicating factor is cloud infrastructure migration: Ideally we start this migration once posthog-worker has been moved to new infrastructure.
Describe the plan
- [x] Decide whether to migrate to JSON data type at the same time. A: No, see https://github.com/PostHog/posthog/issues/10506 for details
- [x] Measure how much extra disk will be used by the new columns in practice.
- [x] Measure compression options & query performance
- [x] Make shards individually addressable on cloud (to avoid using ON CLUSTER queries), support this in async migrations:
- For cloud in the async migration, based on an env var we'll query
system.clusterstable to get a representative node for each shard and use that in queries. - Other environments will run queries ON CLUSTER instead. - [x] Update async migration. - See above section for issues which need to be resolved correctness-wise - We'll copy a snapshot of persons and groups data to a separate table, removing deleted/duplicate rows to make the copy consistent. We'll avoid writing over data that's already filled.
- [x] Test and tune caching-related settings on a subset of data
You can find a WIP version of the backfill SQL here:
-- Copying tables to a more optimal schema
-- Speed up copying data
set max_block_size=100000, max_insert_block_size=100000, max_threads=20, max_insert_threads=20, optimize_on_insert=0, max_execution_time=0
-- person
CREATE TABLE tmp_person_0006
(
`id` UUID,
`created_at` DateTime64(3),
`team_id` Int64,
`properties` String,
`is_deleted` Int8 DEFAULT 0,
`version` UInt64,
)
ENGINE = ReplacingMergeTree(version)
ORDER BY (team_id, id)
SETTINGS index_granularity = 128 -- Faster lookups!
INSERT INTO tmp_person_0006(id, created_at, team_id, properties, is_deleted, version)
SELECT id, created_at, team_id, properties, is_deleted, version FROM person
OPTIMIZE TABLE tmp_person_0006 FINAL DEDUPLICATE -- Set mutations_sync so this completes before backfill begins
ALTER TABLE tmp_person_0006 DELETE FROM tmp_person_0006 WHERE is_deleted -- Set mutations_sync so this completes before backfill begins
-- person_distinct_id
CREATE TABLE tmp_person_distinct_id2_0006
(
`team_id` Int64,
`distinct_id` String,
`person_id` UUID,
`is_deleted` Int8,
`version` Int64 DEFAULT 1,
)
ENGINE = ReplacingMergeTree(version)
ORDER BY (team_id, distinct_id)
SETTINGS index_granularity = 128 -- Faster lookups!
INSERT INTO tmp_person_distinct_id2_0006(team_id, distinct_id, person_id, is_deleted, version)
SELECT team_id, distinct_id, person_id, is_deleted, version FROM person_distinct_id2
OPTIMIZE TABLE tmp_person_distinct_id2_0006 FINAL DEDUPLICATE -- Set mutations_sync so this completes before backfill begins
ALTER TABLE tmp_person_distinct_id2_0006 DELETE WHERE is_deleted -- Set mutations_sync so this completes before backfill begins
-- groups
CREATE TABLE tmp_groups_0006
(
`group_type_index` UInt8,
`group_key` String,
`created_at` DateTime64(3),
`team_id` Int64,
`group_properties` String,
)
ENGINE = ReplacingMergeTree(_timestamp)
ORDER BY (team_id, group_type_index, group_key)
SETTINGS index_granularity = 128 -- Faster lookups!
INSERT INTO tmp_groups_0006(group_type_index, group_key, created_at, team_id, group_properties)
SELECT group_type_index, group_key, created_at, team_id, group_properties FROM groups
OPTIMIZE TABLE tmp_groups_0006 FINAL DEDUPLICATE -- Set mutations_sync so this completes before backfill begins
-- Create dictionaries
CREATE DICTIONARY IF NOT EXISTS groups_dict
(
team_id Int64,
group_type_index UInt8,
group_key String,
group_properties String,
created_at DateTime
)
PRIMARY KEY team_id, group_type_index, group_key
SOURCE(CLICKHOUSE(TABLE groups DB 'posthog' USER 'default'))
LAYOUT(complex_key_cache(size_in_cells 10000))
Lifetime(60000)
CREATE DICTIONARY IF NOT EXISTS person_distinct_id2_dict
(
team_id Int64,
distinct_id String,
person_id UUID
)
PRIMARY KEY team_id, distinct_id
SOURCE(CLICKHOUSE(TABLE person_distinct_id2 DB 'posthog'))
LAYOUT(complex_key_cache(size_in_cells 10000))
DROP DICTIONARY person_dict ON CLUSTER 'posthog'
CREATE DICTIONARY IF NOT EXISTS person_dict
(
team_id Int64,
id UUID,
properties String,
created_at DateTime
)
PRIMARY KEY team_id, id
SOURCE(CLICKHOUSE(TABLE person DB 'posthog'))
LAYOUT(complex_key_cache(size_in_cells 10000))
-- Backfill
-- Verification query
SELECT
*,
toUUID(dictGet('person_distinct_id2_dict', 'person_id', tuple(team_id, distinct_id))) as real_person_id,
dictGetString('person_dict', 'properties', tuple(team_id, toUUID(dictGet('person_distinct_id2_dict', 'person_id', tuple(team_id, distinct_id))))) AS real_person_properties,
dictGetDateTime('person_dict', 'created_at', tuple(team_id, toUUID(dictGet('person_distinct_id2_dict', 'person_id', tuple(team_id, distinct_id))))) AS real_person_created_at,
$group_0,
dictGetString('groups_dict', 'group_properties', tuple(team_id, 0, $group_0)) AS real_group_0_props,
dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 0, $group_0)) AS real_group_0_created_at,
$group_1,
dictGetString('groups_dict', 'group_properties', tuple(team_id, 1, $group_1)) AS real_group_1_props,
dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 1, $group_1)) AS real_group_1_created_at,
$group_2,
dictGetString('groups_dict', 'group_properties', tuple(team_id, 2, $group_2)) AS real_group_2_props,
dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 2, $group_2)) AS real_group_2_created_at,
$group_3,
dictGetString('groups_dict', 'group_properties', tuple(team_id, 3, $group_3)) AS real_group_3_props,
dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 3, $group_3)) AS real_group_3_created_at,
$group_4,
dictGetString('groups_dict', 'group_properties', tuple(team_id, 4, $group_4)) AS real_group_4_props,
dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 4, $group_4)) AS real_group_4_created_at
FROM sharded_events
WHERE team_id = 2 AND $group_0 != ''
LIMIT 10
ALTER TABLE sharded_events
UPDATE
person_id=toUUID(dictGet('person_distinct_id2_dict', 'person_id', tuple(team_id, distinct_id))),
person_properties=dictGetString('person_dict', 'properties', tuple(team_id, toUUID(dictGet('person_distinct_id2_dict', 'person_id', tuple(team_id, distinct_id))))),
person_created_at=dictGetDateTime('person_dict', 'created_at', tuple(team_id, toUUID(dictGet('person_distinct_id2_dict', 'person_id', tuple(team_id, distinct_id)))))
group0_properties=dictGetString('groups_dict', 'group_properties', tuple(team_id, 0, $group_0)),
group1_properties=dictGetString('groups_dict', 'group_properties', tuple(team_id, 1, $group_1)),
group2_properties=dictGetString('groups_dict', 'group_properties', tuple(team_id, 2, $group_2)),
group3_properties=dictGetString('groups_dict', 'group_properties', tuple(team_id, 3, $group_3)),
group4_properties=dictGetString('groups_dict', 'group_properties', tuple(team_id, 4, $group_4)),
person_created_at=dictGetDateTime('person_dict', 'created_at', tuple(team_id, toUUID(dictGet('person_distinct_id2_dict', 'person_id', tuple(team_id, distinct_id))))),
group_0_created_at=dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 0, $group_0)),
group_1_created_at=dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 1, $group_1)),
group_2_created_at=dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 2, $group_2)),
group_3_created_at=dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 3, $group_3)),
group_4_created_at=dictGetDateTime('groups_dict', 'created_at', tuple(team_id, 4, $group_4));
Ideally we will be able to build, test and ship this as part of release 1.38.0 - at least as an experimental migration.
Work left out of this async migration
- Resharding events table to be sharded by
person_id. This is vital, but also tricky to implement at the same time. - Supporting querying single shards on self-hosted and self-hosted with external setups. The assumption is that these setups are small enough right now not to run into problems with ON CLUSTER queries.
Thank you for your feature request β we love each and every one!
Thanks for this! Very well thought-out and explained.
Feedback on approach
- Using temporary tables π ensures consistency and we can manipulate things (e.g. index granularity)
-
DROP DICTIONARY->DROP DICTIONARY IF NOT EXISTS(although I think this is just what lingered from your cloud testing - Given we're now on 22.3, we can get rid of
tuplewhen querying dictionaries (see https://github.com/ClickHouse/ClickHouse/pull/26130), although seems we still allow up 21.6 for self-hosted folks (side q: should we change this?) -
OPTIMIZE TABLEmight not behave as expected event when we setmutations_syncso we need to test this carefully when writing the async migration. Unsure if this is fixed on more recent versions, but essentially the server doesn't send anything back to the client and the client times out the connection with an error even if we increase all relevant timeouts (but the OPTIMIZE then goes on to complete successfully). Context here.
Did a bunch of benchmarking on this today. Q/A:
Should we cache, how much?
TL;DR: Yes! More cache is best on cloud to speed things along.
I ran a few experiments with different dictionary caching solutions. One that struck out to me was doing the backfill with 10M events took:
- With no caching, filling 10M events with group columns takes 757s
- With small, 10k item caches, filling these columns takes 177s
- With a larger cache (1M persons, 10M distinct_ids), filling takes 11s
The largest win comes from doing minimal caching, but wins continue with larger cache sizes. There are a few other knobs which sped things up:
See suggested dictionary setup
DROP DICTIONARY IF EXISTS groups_dict;
CREATE DICTIONARY IF NOT EXISTS groups_dict
(
team_id Int64,
group_type_index UInt8,
group_key String,
group_properties String,
created_at DateTime
)
PRIMARY KEY team_id, group_type_index, group_key
SOURCE(CLICKHOUSE(TABLE tmp_groups_0006 DB 'default'))
LAYOUT(complex_key_cache(size_in_cells 1000000 max_threads_for_updates 6 allow_read_expired_keys 1))
Lifetime(60000);
DROP DICTIONARY IF EXISTS person_distinct_id2_dict;
CREATE DICTIONARY IF NOT EXISTS person_distinct_id2_dict
(
team_id Int64,
distinct_id String,
person_id UUID
)
PRIMARY KEY team_id, distinct_id
SOURCE(CLICKHOUSE(TABLE tmp_person_distinct_id2_0006 DB 'default'))
LAYOUT(complex_key_cache(size_in_cells 50000000 max_threads_for_updates 6 allow_read_expired_keys 1))
Lifetime(60000);
DROP DICTIONARY IF EXISTS person_dict;
CREATE DICTIONARY IF NOT EXISTS person_dict
(
team_id Int64,
id UUID,
properties String,
created_at DateTime
)
PRIMARY KEY team_id, id
SOURCE(CLICKHOUSE(TABLE tmp_person_0006 DB 'default'))
LAYOUT(complex_key_cache(size_in_cells 5000000 max_threads_for_updates 6 allow_read_expired_keys 1))
Lifetime(60000);
This achieved the following results on test data set:
ββnameββββββββββββββββββββββ¬βquery_countββ¬βββββββββββhit_rateββ¬βββββββββfound_rateββ¬βelement_countββ¬βββββββββload_factorββ¬βallocatedββ
β person_dict β 1005861016 β 0.9613943403886726 β 0.9999996132666503 β 6309296 β 0.7521266937255859 β 8.22 GiB β
β person_distinct_id2_dict β 1508791524 β 0.9697972739936999 β 0.9997154537302398 β 7295240 β 0.10870754718780518 β 3.37 GiB β
β groups_dict β 5029305080 β 0.9976188046241967 β 0.9999977770288694 β 30856 β 0.02942657470703125 β 56.62 MiB β
ββββββββββββββββββββββββββββ΄ββββββββββββββ΄βββββββββββββββββββββ΄βββββββββββββββββββββ΄ββββββββββββββββ΄ββββββββββββββββββββββ΄ββββββββββββ
On cloud we should use as much memory as possible under this as large memory usage can make this complete in a reasonable (estimating few days at most) timeframe.
Note I also tried out complex_key_ssd_cache but this was discarded due to segfaults and issues on self-hosted external clickhouse providers.
Caching requirements on self-hosted
For self-hosted, let's choose a conservative caching option (using less than 500MB of ram on clickhouse) but with the option to increase these parameters as the migration is run.
Extra space requirement for migration
New columns
After filling the new columns on the test dataset, here's how column statistics looked:
ββcolumnβββββββββββββ¬βpartsββ¬βββββrowsββ¬βon_diskβββββ¬βcompressedβββ¬βuncompressedββ¬βmarks_sizeββ¬βpercentage_of_totalββ
β person_properties β 545 β 67653790 β 17.97 GiB β 17.97 GiB β 95.40 GiB β 742.34 KiB β 52.7673% β
β properties β 545 β 67653790 β 10.08 GiB β 10.08 GiB β 152.85 GiB β 742.34 KiB β 29.6131% β
β elements_chain β 545 β 67653790 β 2.10 GiB β 2.10 GiB β 27.29 GiB β 742.34 KiB β 6.177% β
β uuid β 545 β 67653790 β 1.00 GiB β 1023.77 MiB β 1.01 GiB β 742.34 KiB β 2.9383% β
β $window_id β 545 β 67653790 β 747.25 MiB β 746.52 MiB β 2.32 GiB β 742.34 KiB β 2.1431% β
β created_at β 545 β 67653790 β 425.51 MiB β 424.79 MiB β 516.16 MiB β 742.34 KiB β 1.2204% β
β timestamp β 545 β 67653790 β 425.51 MiB β 424.79 MiB β 516.16 MiB β 742.34 KiB β 1.2204% β
β $session_id β 545 β 67653790 β 412.40 MiB β 411.68 MiB β 2.31 GiB β 742.34 KiB β 1.1828% β
β distinct_id β 545 β 67653790 β 353.12 MiB β 352.40 MiB β 2.47 GiB β 742.34 KiB β 1.0128% β
β _timestamp β 545 β 67653790 β 237.44 MiB β 236.72 MiB β 258.08 MiB β 742.34 KiB β 0.681% β
β person_id β 545 β 67653790 β 150.04 MiB β 149.31 MiB β 1.01 GiB β 742.34 KiB β 0.4303% β
β group0_properties β 545 β 67653790 β 71.17 MiB β 70.44 MiB β 2.46 GiB β 742.34 KiB β 0.2041% β
β person_created_at β 545 β 67653790 β 65.97 MiB β 65.25 MiB β 516.16 MiB β 742.34 KiB β 0.1892% β
β group2_properties β 545 β 67653790 β 11.72 MiB β 10.99 MiB β 334.65 MiB β 742.34 KiB β 0.0336% β
β $group_0 β 545 β 67653790 β 10.42 MiB β 9.70 MiB β 150.17 MiB β 742.34 KiB β 0.0299% β
β group1_properties β 545 β 67653790 β 9.43 MiB β 8.70 MiB β 394.63 MiB β 742.34 KiB β 0.027% β
β $group_2 β 545 β 67653790 β 6.53 MiB β 5.81 MiB β 136.95 MiB β 742.34 KiB β 0.0187% β
β event β 545 β 67653790 β 5.71 MiB β 4.99 MiB β 1.00 GiB β 742.34 KiB β 0.0164% β
β group0_created_at β 545 β 67653790 β 5.42 MiB β 4.69 MiB β 516.16 MiB β 742.34 KiB β 0.0155% β
β group2_created_at β 545 β 67653790 β 4.64 MiB β 3.92 MiB β 516.16 MiB β 742.34 KiB β 0.0133% β
β group1_created_at β 545 β 67653790 β 3.94 MiB β 3.22 MiB β 516.16 MiB β 742.34 KiB β 0.0113% β
β team_id β 545 β 67653790 β 3.04 MiB β 2.32 MiB β 516.16 MiB β 742.34 KiB β 0.0087% β
β group3_created_at β 545 β 67653790 β 3.00 MiB β 2.28 MiB β 516.16 MiB β 742.34 KiB β 0.0086% β
β group4_created_at β 545 β 67653790 β 3.00 MiB β 2.28 MiB β 516.16 MiB β 742.34 KiB β 0.0086% β
β _offset β 545 β 67653790 β 3.00 MiB β 2.28 MiB β 516.16 MiB β 742.34 KiB β 0.0086% β
β $group_1 β 545 β 67653790 β 2.86 MiB β 2.14 MiB β 118.24 MiB β 742.34 KiB β 0.0082% β
β group3_properties β 545 β 67653790 β 1.02 MiB β 305.01 KiB β 64.52 MiB β 742.34 KiB β 0.0029% β
β $group_3 β 545 β 67653790 β 1.02 MiB β 304.92 KiB β 64.52 MiB β 742.34 KiB β 0.0029% β
β $group_4 β 545 β 67653790 β 1.02 MiB β 304.80 KiB β 64.52 MiB β 742.34 KiB β 0.0029% β
β group4_properties β 545 β 67653790 β 1.02 MiB β 304.80 KiB β 64.52 MiB β 742.34 KiB β 0.0029% β
βββββββββββββββββββββ΄ββββββββ΄βββββββββββ΄βββββββββββββ΄ββββββββββββββ΄βββββββββββββββ΄βββββββββββββ΄ββββββββββββββββββββββ
~54% of space was taken up by the new columns. The main issue seems to be that the compression ratio isn't as good as for event properties.
This is likely an effect from our our sort key: (team_id, toDate(timestamp), event, cityHash64(distinct_id), cityHash64(uuid)) causes same events from a team to be nearer to each other, which would likely have more similar payloads, while person properties which change from user-to-user would be more disparate.
In an ideal world, using the new JSON data type would offset the extra space usage (thanks to storing the data more effectively) but this feature is not ready for our usage yet. See https://github.com/PostHog/posthog/issues/10506 for more details.
I'll explore alternative compression options in a follow-up comment as this much space usage hurts our bottom line a bit.
Extra space requirement for temporary tables
On my test instance, these tables took up the following amount of space.
ββdatabaseββ¬βnameββββββββββββββββββββββββββ¬βtotal_bytesββ¬βformatReadableSize(total_bytes)ββ
β default β tmp_person_0006 β 61290502818 β 57.08 GiB β
β default β tmp_person_distinct_id2_0006 β 25103022126 β 23.38 GiB β
β default β tmp_groups_0006 β 13317282 β 12.70 MiB β
ββββββββββββ΄βββββββββββββββββββββββββββββββ΄ββββββββββββββ΄ββββββββββββββββββββββββββββββββββ
We're (nearly) doubling the space used by persons but it's a rounding error compared to what's needed for events table.
How long will the migration take on Cloud
The largest backfill tested (with ample memory for caching) took 1323s for ~224M events. This means the migration can be expected to run in less than 3 days if started on each shard separately (and relying on replication).
During this time queries against machines doing the update will be slower as the migration is doing a lot of I/O (I saw >100MB/s write speed regularly).
As such it'd be ideal to run this on a weekend, if needed on multiple weekends.
We could add a replica to each shard with a low priority for doing this migration, but given it's expected runtime this is likely not needed.
Next measuremenets
Sadly measuring all of this is taking a lot longer than I was expecting. I'm currently in the middle of:
- Experimenting with different compression codecs
- Performance benchmarking
Will post these summaries once they are ready.
Codecs
I've summarized learnings on different codecs in https://github.com/PostHog/posthog/issues/10616, but the TL;DR is that I think we should use ZSTD(3) which provides a ~5.3x improvement over the default LZ4 compression.
Performance comparison
I ran a few queries against the new schema, to compare the new and old schemas.
Note all queries are run both with and without page cache to provide realistic results.
Scripts and raw results can be found at https://github.com/PostHog/scratchpad/tree/main/karl/2022-07-14-person-on-events. For each tested queries, the median of 3 runs was taken.
Simple trend query
This query should be unaffected, provided as a baseline
| query | no cache duration | with page cache duration |
|---|---|---|
| Simple trend query | 243ms | 168ms |
Daily active users query
| query | no cache duration | with page cache duration |
|---|---|---|
| With JOIN against person_distinct_id2 | 1674ms | 1197ms |
| Using events.person_id column | 133ms | 101ms |
Conclusion: This query is sped up significantly.
Note that these measurements are not perfect as below ~1 second queries might fluctate a lot.
Person properties query
The table includes queries against event.properties column for reference
| query | no cache duration | with page cache duration |
|---|---|---|
Trend query filtering on event.properties |
5664ms | 2022ms |
Trend query filtering on event.properties (ZSTD(3) compression) |
3338ms | 2192ms |
| Trend query filtering on a materialized event property column | 258ms | 165ms |
Trend query filtering on person property (with join against person table) * |
44072ms | 25187ms |
Trend query filtering on person property (with join against person table, materialized column) * |
44002ms | 25195ms |
Trend query filtering on event.person_properties |
420ms | 356ms |
Trend query filtering on event.person_properties (ZSTD(3) compression) |
418ms | 383ms |
Trend query filtering on a materialized property on event.person_properties column |
102ms | 61ms |
Note on * - these queries OOM-ed without increasing max_memory_usage significantly.
Materialized column with join did not show a significant effect due to the size of the team under test.
Conclusions
- The new schema absolutely speeds up queries by a order of magnitude even without JSON data type
- We'll need to compress with ZSTD 3 for storage and performance reasons, see https://github.com/PostHog/posthog/issues/10616
- We should start investing again into our materialized columns tooling as JSON data type won't be arriving anytime soon.
Next, finally on to implementation!
Closing this out as 0007_persons_and_groups_on_events_backfill.py has been in the wild for some time now and working.
If I am mistaken feel free to reopen π